From 144facb53e44435db165a124c9638abe54c1b62c Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Fri, 9 Feb 2024 09:44:58 +0100 Subject: [PATCH] make str log fields consistent (#3247) * make str log fields consistent * revert unintended change * removed unintended change * more log consistency, test * revision and coordinator idx to logger fields * use consistent revision and coordinator idx in logs --- internal/pkg/api/handleAck.go | 25 +++++++++++++------------ internal/pkg/api/handleAck_test.go | 28 ++++++++++++++++++++++++++++ internal/pkg/api/handleCheckin.go | 19 ++++++++++--------- internal/pkg/bulk/engine.go | 7 ++++--- internal/pkg/coordinator/monitor.go | 5 +++-- internal/pkg/dl/policies.go | 3 ++- internal/pkg/logger/ecs.go | 5 +++++ internal/pkg/policy/monitor.go | 16 ++++++++-------- internal/pkg/policy/policy_output.go | 8 ++++---- internal/pkg/policy/self.go | 9 +++++---- 10 files changed, 82 insertions(+), 43 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index ec707be21..a4a8ce447 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" "github.com/elastic/fleet-server/v7/internal/pkg/smap" @@ -239,8 +240,8 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent span.Context.SetLabel("agent_id", agent.Agent.ID) span.Context.SetLabel("action_id", event.ActionId) log := zlog.With(). - Str("actionId", event.ActionId). - Str("agentId", event.AgentId). + Str(logger.ActionID, event.ActionId). + Str(logger.AgentID, event.AgentId). Time("timestamp", event.Timestamp). Int("n", n).Logger() log.Info().Msg("ack event") @@ -395,8 +396,8 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag Int64("agent.revisionIdx", currRev). Int64("agent.coordinatorIdx", currCoord). Str("rev.policyId", rev.PolicyID). - Int64("rev.revisionIdx", rev.RevisionIdx). - Int64("rev.coordinatorIdx", rev.CoordinatorIdx). + Int64(logger.RevisionIdx, rev.RevisionIdx). + Int64(logger.CoordinatorIdx, rev.CoordinatorIdx). Msg("ack policy revision") if ok && rev.PolicyID == agent.PolicyID && @@ -448,7 +449,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, if outputName != "" { outputBulk := ack.bulk.GetBulker(outputName) if outputBulk != nil { - zlog.Debug().Str("outputName", outputName).Msg("Using output bulker in updateAPIKey") + zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Using output bulker in updateAPIKey") bulk = outputBulk } } @@ -459,14 +460,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context, zlog.Warn(). Err(err). Str(LogAPIKeyID, apiKeyID). - Str("outputName", outputName). + Str(logger.PolicyOutputName, outputName). Msg("Failed to read API Key roles") } else { // race when API key was invalidated before acking zlog.Info(). Err(err). Str(LogAPIKeyID, apiKeyID). - Str("outputName", outputName). + Str(logger.PolicyOutputName, outputName). Msg("Failed to read invalidated API Key roles") // prevents future checks @@ -482,14 +483,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context, Msg("Failed to cleanup roles") } else if removedRolesCount > 0 { if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil { - zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str("outputName", outputName).Msg("Failed to update API Key") + zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str(logger.PolicyOutputName, outputName).Msg("Failed to update API Key") } else { zlog.Debug(). Str("hash.sha256", permissionHash). Str(LogAPIKeyID, apiKeyID). RawJSON("roles", clean). Int("removedRoles", removedRolesCount). - Str("outputName", outputName). + Str(logger.PolicyOutputName, outputName). Msg("Updating agent record to pick up reduced roles.") } } @@ -722,17 +723,17 @@ func invalidateAPIKeys(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, // read output config from .fleet-policies, not filtering by policy id as agent could be reassigned policy, err := dl.QueryOutputFromPolicy(ctx, bulk, outputName) if err != nil || policy == nil { - zlog.Warn().Str("outputName", outputName).Any("ids", outputIds).Msg("Output policy not found, API keys will be orphaned") + zlog.Warn().Str(logger.PolicyOutputName, outputName).Any("ids", outputIds).Msg("Output policy not found, API keys will be orphaned") } else { outputBulk, _, err = bulk.CreateAndGetBulker(ctx, zlog, outputName, policy.Data.Outputs) if err != nil { - zlog.Warn().Str("outputName", outputName).Any("ids", outputIds).Msg("Failed to recreate output bulker, API keys will be orphaned") + zlog.Warn().Str(logger.PolicyOutputName, outputName).Any("ids", outputIds).Msg("Failed to recreate output bulker, API keys will be orphaned") } } } if outputBulk != nil { if err := outputBulk.APIKeyInvalidate(ctx, outputIds...); err != nil { - zlog.Info().Err(err).Strs("ids", outputIds).Str("outputName", outputName).Msg("Failed to invalidate API keys") + zlog.Info().Err(err).Strs("ids", outputIds).Str(logger.PolicyOutputName, outputName).Msg("Failed to invalidate API keys") } } } diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 45839a916..0940b3d73 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -639,6 +639,34 @@ func TestInvalidateAPIKeysRemoteOutputReadFromPolicies(t *testing.T) { remoteBulker.AssertExpectations(t) } +func TestInvalidateAPIKeysRemoteOutputReadFromPoliciesNotFound(t *testing.T) { + toRetire := []model.ToRetireAPIKeyIdsItems{{ + ID: "toRetire1", + Output: "remote1", + }} + + remoteBulker := ftesting.NewMockBulk() + + bulkerFn := func(t *testing.T) *ftesting.MockBulk { + m := ftesting.NewMockBulk() + m.On("Search", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{ + Hits: []es.HitT{}, + }}, nil).Once() + + m.On("GetBulker", "remote1").Return(nil) + return m + } + + bulker := bulkerFn(t) + + logger := testlog.SetLogger(t) + ack := &AckT{bulk: bulker} + ack.invalidateAPIKeys(context.Background(), logger, toRetire, "") + + bulker.AssertExpectations(t) + remoteBulker.AssertExpectations(t) +} + func TestAckHandleUpgrade(t *testing.T) { tests := []struct { name string diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 4ead71966..a6a1e73dd 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/policy" @@ -277,7 +278,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r defer func() { err := ct.pm.Unsubscribe(sub) if err != nil { - zlog.Error().Err(err).Str("policy_id", agent.PolicyID).Msg("unable to unsubscribe from policy") + zlog.Error().Err(err).Str(logger.PolicyID, agent.PolicyID).Msg("unable to unsubscribe from policy") } }() @@ -303,7 +304,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r // Initial update on checkin, and any user fields that might have changed err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver) if err != nil { - zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed") + zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed") } // Initial fetch for pending actions @@ -357,7 +358,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r case <-tick.C: err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver) if err != nil { - zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed") + zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed") } } } @@ -531,8 +532,8 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r zlog.Info(). Str("ackToken", fromPtr(resp.AckToken)). Str("createdAt", action.CreatedAt). - Str("id", action.Id). - Str("type", string(action.Type)). + Str(logger.ActionID, action.Id). + Str(logger.ActionType, string(action.Type)). Str("inputType", action.InputType). Int64("timeout", fromPtr(action.Timeout)). Msg("Action delivered to agent on checkin") @@ -640,7 +641,7 @@ func filterActions(zlog zerolog.Logger, agentID string, actions []model.Action) resp := make([]model.Action, 0, len(actions)) for _, action := range actions { if valid := validActionTypes[action.Type]; !valid { - zlog.Info().Str("agent_id", agentID).Str("action_id", action.ActionID).Str("type", action.Type).Msg("Removing action found in index from check in response") + zlog.Info().Str(logger.AgentID, agentID).Str(logger.ActionID, action.ActionID).Str(logger.ActionType, action.Type).Msg("Removing action found in index from check in response") continue } resp = append(resp, action) @@ -714,7 +715,7 @@ func convertActions(zlog zerolog.Logger, agentID string, actions []model.Action) for _, action := range actions { ad, err := convertActionData(ActionType(action.Type), action.Data) if err != nil { - zlog.Error().Err(err).Str("action_id", action.ActionID).Str("type", action.Type).Msg("Failed to convert action.Data") + zlog.Error().Err(err).Str(logger.ActionID, action.ActionID).Str(logger.ActionType, action.Type).Msg("Failed to convert action.Data") continue } r := Action{ @@ -765,8 +766,8 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a defer span.End() zlog = zlog.With(). Str("fleet.ctx", "processPolicy"). - Int64("fleet.policyRevision", pp.Policy.RevisionIdx). - Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx). + Int64(logger.RevisionIdx, pp.Policy.RevisionIdx). + Int64(logger.CoordinatorIdx, pp.Policy.CoordinatorIdx). Str(LogPolicyID, pp.Policy.PolicyID). Logger() diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 799583b11..daf5572e4 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" @@ -174,7 +175,7 @@ func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, ou errCh := make(chan error) go func() { runFunc := func() (err error) { - zlog.Debug().Str("outputName", outputName).Msg("Bulker started") + zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulker started") return newBulker.Run(bulkCtx) } @@ -183,9 +184,9 @@ func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, ou go func() { select { case err = <-errCh: - zlog.Error().Err(err).Str("outputName", outputName).Msg("Bulker error") + zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg("Bulker error") case <-bulkCtx.Done(): - zlog.Debug().Str("outputName", outputName).Msg("Bulk context done") + zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulk context done") err = bulkCtx.Err() } }() diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 692292fa2..907be1899 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/sleep" @@ -426,12 +427,12 @@ func runCoordinatorOutput(ctx context.Context, cord Coordinator, bulker bulk.Bul for { select { case p := <-cord.Output(): - s := l.With().Int64(dl.FieldRevisionIdx, p.RevisionIdx).Int64(dl.FieldCoordinatorIdx, p.CoordinatorIdx).Logger() + s := l.With().Int64(logger.RevisionIdx, p.RevisionIdx).Int64(logger.CoordinatorIdx, p.CoordinatorIdx).Logger() _, err := dl.CreatePolicy(ctx, bulker, p, dl.WithIndexName(policiesIndex)) if err != nil { s.Err(err).Msg("Policy coordinator failed to add a new policy revision") } else { - s.Info().Int64("revision_id", p.RevisionIdx).Msg("Policy coordinator added a new policy revision") + s.Info().Msg("Policy coordinator added a new policy revision") } case <-ctx.Done(): return diff --git a/internal/pkg/dl/policies.go b/internal/pkg/dl/policies.go index ae67179e9..6e67d64b1 100644 --- a/internal/pkg/dl/policies.go +++ b/internal/pkg/dl/policies.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/rs/zerolog" @@ -110,6 +111,6 @@ func QueryOutputFromPolicy(ctx context.Context, bulker bulk.Bulk, outputName str return &policy, nil } } - zerolog.Ctx(ctx).Debug().Str("outputName", outputName).Msg("policy with output not found") + zerolog.Ctx(ctx).Debug().Str(logger.PolicyOutputName, outputName).Msg("policy with output not found") return nil, nil } diff --git a/internal/pkg/logger/ecs.go b/internal/pkg/logger/ecs.go index 8a6c8a362..9dd44cb7f 100644 --- a/internal/pkg/logger/ecs.go +++ b/internal/pkg/logger/ecs.go @@ -66,4 +66,9 @@ const ( EnrollAPIKeyID = "fleet.enroll.apikey.id" AccessAPIKeyID = "fleet.access.apikey.id" DefaultOutputAPIKeyID = "fleet.default.apikey.id" + ActionID = "fleet.action.id" + ActionType = "fleet.action.type" + PolicyOutputName = "fleet.policy.output.name" + RevisionIdx = "fleet.revision_idx" + CoordinatorIdx = "fleet.coordinator_idx" ) diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 61c9bd680..1d23fb48b 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -269,8 +269,8 @@ func (m *monitorT) dispatchPending(ctx context.Context) { Str(logger.PolicyID, s.policyID). Int64("subscription_revision_idx", s.revIdx). Int64("subscription_coordinator_idx", s.coordIdx). - Int64(dl.FieldRevisionIdx, policy.pp.Policy.RevisionIdx). - Int64(dl.FieldCoordinatorIdx, policy.pp.Policy.CoordinatorIdx). + Int64(logger.RevisionIdx, s.revIdx). + Int64(logger.CoordinatorIdx, s.coordIdx). Msg("dispatch policy change") default: // Should never block on a channel; we created a channel of size one. @@ -377,8 +377,8 @@ func (m *monitorT) updatePolicy(ctx context.Context, pp *ParsedPolicy) bool { zlog := m.log.With(). Str(logger.PolicyID, newPolicy.PolicyID). - Int64(dl.FieldRevisionIdx, newPolicy.RevisionIdx). - Int64(dl.FieldCoordinatorIdx, newPolicy.CoordinatorIdx). + Int64(logger.RevisionIdx, newPolicy.RevisionIdx). + Int64(logger.CoordinatorIdx, newPolicy.CoordinatorIdx). Logger() if newPolicy.CoordinatorIdx <= 0 { @@ -475,8 +475,8 @@ func (m *monitorT) Subscribe(agentID string, policyID string, revisionIdx int64, m.log.Debug(). Str(logger.AgentID, agentID). Str(logger.PolicyID, policyID). - Int64(dl.FieldRevisionIdx, revisionIdx). - Int64(dl.FieldCoordinatorIdx, coordinatorIdx). + Int64(logger.RevisionIdx, revisionIdx). + Int64(logger.CoordinatorIdx, coordinatorIdx). Msg("subscribed to policy monitor") s := NewSub( @@ -537,8 +537,8 @@ func (m *monitorT) Unsubscribe(sub Subscription) error { m.log.Debug(). Str(logger.AgentID, s.agentID). Str(logger.PolicyID, s.policyID). - Int64(dl.FieldRevisionIdx, s.revIdx). - Int64(dl.FieldCoordinatorIdx, s.coordIdx). + Int64(logger.RevisionIdx, s.revIdx). + Int64(logger.CoordinatorIdx, s.coordIdx). Msg("unsubscribe") return nil diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 1eabe158e..1d099cd8b 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -51,7 +51,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B span.Context.SetLabel("output_type", p.Type) zlog = zlog.With(). Str(logger.AgentID, agent.Id). - Str("fleet.policy.output.name", p.Name).Logger() + Str(logger.PolicyOutputName, p.Name).Logger() switch p.Type { case OutputTypeElasticsearch: @@ -125,7 +125,7 @@ func (p *Output) prepareElasticsearch( } } if !found { - zlog.Info().Str(logger.APIKeyID, agentOutput.APIKeyID).Str("outputName", agentOutputName).Msg("Output removed, will retire API key") + zlog.Info().Str(logger.APIKeyID, agentOutput.APIKeyID).Str(logger.PolicyOutputName, agentOutputName).Msg("Output removed, will retire API key") toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{ ID: agentOutput.APIKeyID, RetiredAt: time.Now().UTC().Format(time.RFC3339), @@ -270,10 +270,10 @@ func (p *Output) prepareElasticsearch( State: client.UnitStateDegraded.String(), Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err), } - zerolog.Ctx(ctx).Warn().Err(err).Str("outputName", p.Name).Msg(doc.Message) + zerolog.Ctx(ctx).Warn().Err(err).Str(logger.PolicyOutputName, p.Name).Msg(doc.Message) if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { - zlog.Error().Err(err).Str("outputName", p.Name).Msg("error writing output health") + zlog.Error().Err(err).Str(logger.PolicyOutputName, p.Name).Msg("error writing output health") } } diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 4803cf165..d468b0f39 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/state" @@ -249,7 +250,7 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return state, nil } -func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) { +func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger) { //pinging logic bulkerMap := bulker.GetBulkerMap() for outputName, outputBulker := range bulkerMap { @@ -262,15 +263,15 @@ func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Lo if err != nil { doc.State = client.UnitStateDegraded.String() doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error()) - logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) + zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg(doc.Message) } else if res.StatusCode != 200 { doc.State = client.UnitStateDegraded.String() doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode) - logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) + zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg(doc.Message) } if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { - logger.Error().Err(err).Str("outputName", outputName).Msg("error writing output health") + zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg("error writing output health") } } }