Skip to content

Commit

Permalink
make str log fields consistent (#3247)
Browse files Browse the repository at this point in the history
* make str log fields consistent

* revert unintended change

* removed unintended change

* more log consistency, test

* revision and coordinator idx to logger fields

* use consistent revision and coordinator idx in logs
  • Loading branch information
juliaElastic authored Feb 9, 2024
1 parent d2dcf16 commit 144facb
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 43 deletions.
25 changes: 13 additions & 12 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
Expand Down Expand Up @@ -239,8 +240,8 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent
span.Context.SetLabel("agent_id", agent.Agent.ID)
span.Context.SetLabel("action_id", event.ActionId)
log := zlog.With().
Str("actionId", event.ActionId).
Str("agentId", event.AgentId).
Str(logger.ActionID, event.ActionId).
Str(logger.AgentID, event.AgentId).
Time("timestamp", event.Timestamp).
Int("n", n).Logger()
log.Info().Msg("ack event")
Expand Down Expand Up @@ -395,8 +396,8 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
Int64("agent.revisionIdx", currRev).
Int64("agent.coordinatorIdx", currCoord).
Str("rev.policyId", rev.PolicyID).
Int64("rev.revisionIdx", rev.RevisionIdx).
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
Int64(logger.RevisionIdx, rev.RevisionIdx).
Int64(logger.CoordinatorIdx, rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID &&
Expand Down Expand Up @@ -448,7 +449,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
if outputName != "" {
outputBulk := ack.bulk.GetBulker(outputName)
if outputBulk != nil {
zlog.Debug().Str("outputName", outputName).Msg("Using output bulker in updateAPIKey")
zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Using output bulker in updateAPIKey")
bulk = outputBulk
}
}
Expand All @@ -459,14 +460,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
zlog.Warn().
Err(err).
Str(LogAPIKeyID, apiKeyID).
Str("outputName", outputName).
Str(logger.PolicyOutputName, outputName).
Msg("Failed to read API Key roles")
} else {
// race when API key was invalidated before acking
zlog.Info().
Err(err).
Str(LogAPIKeyID, apiKeyID).
Str("outputName", outputName).
Str(logger.PolicyOutputName, outputName).
Msg("Failed to read invalidated API Key roles")

// prevents future checks
Expand All @@ -482,14 +483,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str("outputName", outputName).Msg("Failed to update API Key")
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str(logger.PolicyOutputName, outputName).Msg("Failed to update API Key")
} else {
zlog.Debug().
Str("hash.sha256", permissionHash).
Str(LogAPIKeyID, apiKeyID).
RawJSON("roles", clean).
Int("removedRoles", removedRolesCount).
Str("outputName", outputName).
Str(logger.PolicyOutputName, outputName).
Msg("Updating agent record to pick up reduced roles.")
}
}
Expand Down Expand Up @@ -722,17 +723,17 @@ func invalidateAPIKeys(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk,
// read output config from .fleet-policies, not filtering by policy id as agent could be reassigned
policy, err := dl.QueryOutputFromPolicy(ctx, bulk, outputName)
if err != nil || policy == nil {
zlog.Warn().Str("outputName", outputName).Any("ids", outputIds).Msg("Output policy not found, API keys will be orphaned")
zlog.Warn().Str(logger.PolicyOutputName, outputName).Any("ids", outputIds).Msg("Output policy not found, API keys will be orphaned")
} else {
outputBulk, _, err = bulk.CreateAndGetBulker(ctx, zlog, outputName, policy.Data.Outputs)
if err != nil {
zlog.Warn().Str("outputName", outputName).Any("ids", outputIds).Msg("Failed to recreate output bulker, API keys will be orphaned")
zlog.Warn().Str(logger.PolicyOutputName, outputName).Any("ids", outputIds).Msg("Failed to recreate output bulker, API keys will be orphaned")
}
}
}
if outputBulk != nil {
if err := outputBulk.APIKeyInvalidate(ctx, outputIds...); err != nil {
zlog.Info().Err(err).Strs("ids", outputIds).Str("outputName", outputName).Msg("Failed to invalidate API keys")
zlog.Info().Err(err).Strs("ids", outputIds).Str(logger.PolicyOutputName, outputName).Msg("Failed to invalidate API keys")
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,34 @@ func TestInvalidateAPIKeysRemoteOutputReadFromPolicies(t *testing.T) {
remoteBulker.AssertExpectations(t)
}

func TestInvalidateAPIKeysRemoteOutputReadFromPoliciesNotFound(t *testing.T) {
toRetire := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
Output: "remote1",
}}

remoteBulker := ftesting.NewMockBulk()

bulkerFn := func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{},
}}, nil).Once()

m.On("GetBulker", "remote1").Return(nil)
return m
}

bulker := bulkerFn(t)

logger := testlog.SetLogger(t)
ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), logger, toRetire, "")

bulker.AssertExpectations(t)
remoteBulker.AssertExpectations(t)
}

func TestAckHandleUpgrade(t *testing.T) {
tests := []struct {
name string
Expand Down
19 changes: 10 additions & 9 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/checkin"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
Expand Down Expand Up @@ -277,7 +278,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer func() {
err := ct.pm.Unsubscribe(sub)
if err != nil {
zlog.Error().Err(err).Str("policy_id", agent.PolicyID).Msg("unable to unsubscribe from policy")
zlog.Error().Err(err).Str(logger.PolicyID, agent.PolicyID).Msg("unable to unsubscribe from policy")
}
}()

Expand All @@ -303,7 +304,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}

// Initial fetch for pending actions
Expand Down Expand Up @@ -357,7 +358,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
}
}
Expand Down Expand Up @@ -531,8 +532,8 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Info().
Str("ackToken", fromPtr(resp.AckToken)).
Str("createdAt", action.CreatedAt).
Str("id", action.Id).
Str("type", string(action.Type)).
Str(logger.ActionID, action.Id).
Str(logger.ActionType, string(action.Type)).
Str("inputType", action.InputType).
Int64("timeout", fromPtr(action.Timeout)).
Msg("Action delivered to agent on checkin")
Expand Down Expand Up @@ -640,7 +641,7 @@ func filterActions(zlog zerolog.Logger, agentID string, actions []model.Action)
resp := make([]model.Action, 0, len(actions))
for _, action := range actions {
if valid := validActionTypes[action.Type]; !valid {
zlog.Info().Str("agent_id", agentID).Str("action_id", action.ActionID).Str("type", action.Type).Msg("Removing action found in index from check in response")
zlog.Info().Str(logger.AgentID, agentID).Str(logger.ActionID, action.ActionID).Str(logger.ActionType, action.Type).Msg("Removing action found in index from check in response")
continue
}
resp = append(resp, action)
Expand Down Expand Up @@ -714,7 +715,7 @@ func convertActions(zlog zerolog.Logger, agentID string, actions []model.Action)
for _, action := range actions {
ad, err := convertActionData(ActionType(action.Type), action.Data)
if err != nil {
zlog.Error().Err(err).Str("action_id", action.ActionID).Str("type", action.Type).Msg("Failed to convert action.Data")
zlog.Error().Err(err).Str(logger.ActionID, action.ActionID).Str(logger.ActionType, action.Type).Msg("Failed to convert action.Data")
continue
}
r := Action{
Expand Down Expand Up @@ -765,8 +766,8 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
defer span.End()
zlog = zlog.With().
Str("fleet.ctx", "processPolicy").
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
Int64(logger.RevisionIdx, pp.Policy.RevisionIdx).
Int64(logger.CoordinatorIdx, pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

Expand Down
7 changes: 4 additions & 3 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
Expand Down Expand Up @@ -174,7 +175,7 @@ func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, ou
errCh := make(chan error)
go func() {
runFunc := func() (err error) {
zlog.Debug().Str("outputName", outputName).Msg("Bulker started")
zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulker started")
return newBulker.Run(bulkCtx)
}

Expand All @@ -183,9 +184,9 @@ func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, ou
go func() {
select {
case err = <-errCh:
zlog.Error().Err(err).Str("outputName", outputName).Msg("Bulker error")
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg("Bulker error")
case <-bulkCtx.Done():
zlog.Debug().Str("outputName", outputName).Msg("Bulk context done")
zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulk context done")
err = bulkCtx.Err()
}
}()
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/coordinator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/sleep"
Expand Down Expand Up @@ -426,12 +427,12 @@ func runCoordinatorOutput(ctx context.Context, cord Coordinator, bulker bulk.Bul
for {
select {
case p := <-cord.Output():
s := l.With().Int64(dl.FieldRevisionIdx, p.RevisionIdx).Int64(dl.FieldCoordinatorIdx, p.CoordinatorIdx).Logger()
s := l.With().Int64(logger.RevisionIdx, p.RevisionIdx).Int64(logger.CoordinatorIdx, p.CoordinatorIdx).Logger()
_, err := dl.CreatePolicy(ctx, bulker, p, dl.WithIndexName(policiesIndex))
if err != nil {
s.Err(err).Msg("Policy coordinator failed to add a new policy revision")
} else {
s.Info().Int64("revision_id", p.RevisionIdx).Msg("Policy coordinator added a new policy revision")
s.Info().Msg("Policy coordinator added a new policy revision")
}
case <-ctx.Done():
return
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/dl/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/rs/zerolog"

Expand Down Expand Up @@ -110,6 +111,6 @@ func QueryOutputFromPolicy(ctx context.Context, bulker bulk.Bulk, outputName str
return &policy, nil
}
}
zerolog.Ctx(ctx).Debug().Str("outputName", outputName).Msg("policy with output not found")
zerolog.Ctx(ctx).Debug().Str(logger.PolicyOutputName, outputName).Msg("policy with output not found")
return nil, nil
}
5 changes: 5 additions & 0 deletions internal/pkg/logger/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ const (
EnrollAPIKeyID = "fleet.enroll.apikey.id"
AccessAPIKeyID = "fleet.access.apikey.id"
DefaultOutputAPIKeyID = "fleet.default.apikey.id"
ActionID = "fleet.action.id"
ActionType = "fleet.action.type"
PolicyOutputName = "fleet.policy.output.name"
RevisionIdx = "fleet.revision_idx"
CoordinatorIdx = "fleet.coordinator_idx"
)
16 changes: 8 additions & 8 deletions internal/pkg/policy/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ func (m *monitorT) dispatchPending(ctx context.Context) {
Str(logger.PolicyID, s.policyID).
Int64("subscription_revision_idx", s.revIdx).
Int64("subscription_coordinator_idx", s.coordIdx).
Int64(dl.FieldRevisionIdx, policy.pp.Policy.RevisionIdx).
Int64(dl.FieldCoordinatorIdx, policy.pp.Policy.CoordinatorIdx).
Int64(logger.RevisionIdx, s.revIdx).
Int64(logger.CoordinatorIdx, s.coordIdx).
Msg("dispatch policy change")
default:
// Should never block on a channel; we created a channel of size one.
Expand Down Expand Up @@ -377,8 +377,8 @@ func (m *monitorT) updatePolicy(ctx context.Context, pp *ParsedPolicy) bool {

zlog := m.log.With().
Str(logger.PolicyID, newPolicy.PolicyID).
Int64(dl.FieldRevisionIdx, newPolicy.RevisionIdx).
Int64(dl.FieldCoordinatorIdx, newPolicy.CoordinatorIdx).
Int64(logger.RevisionIdx, newPolicy.RevisionIdx).
Int64(logger.CoordinatorIdx, newPolicy.CoordinatorIdx).
Logger()

if newPolicy.CoordinatorIdx <= 0 {
Expand Down Expand Up @@ -475,8 +475,8 @@ func (m *monitorT) Subscribe(agentID string, policyID string, revisionIdx int64,
m.log.Debug().
Str(logger.AgentID, agentID).
Str(logger.PolicyID, policyID).
Int64(dl.FieldRevisionIdx, revisionIdx).
Int64(dl.FieldCoordinatorIdx, coordinatorIdx).
Int64(logger.RevisionIdx, revisionIdx).
Int64(logger.CoordinatorIdx, coordinatorIdx).
Msg("subscribed to policy monitor")

s := NewSub(
Expand Down Expand Up @@ -537,8 +537,8 @@ func (m *monitorT) Unsubscribe(sub Subscription) error {
m.log.Debug().
Str(logger.AgentID, s.agentID).
Str(logger.PolicyID, s.policyID).
Int64(dl.FieldRevisionIdx, s.revIdx).
Int64(dl.FieldCoordinatorIdx, s.coordIdx).
Int64(logger.RevisionIdx, s.revIdx).
Int64(logger.CoordinatorIdx, s.coordIdx).
Msg("unsubscribe")

return nil
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B
span.Context.SetLabel("output_type", p.Type)
zlog = zlog.With().
Str(logger.AgentID, agent.Id).
Str("fleet.policy.output.name", p.Name).Logger()
Str(logger.PolicyOutputName, p.Name).Logger()

switch p.Type {
case OutputTypeElasticsearch:
Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *Output) prepareElasticsearch(
}
}
if !found {
zlog.Info().Str(logger.APIKeyID, agentOutput.APIKeyID).Str("outputName", agentOutputName).Msg("Output removed, will retire API key")
zlog.Info().Str(logger.APIKeyID, agentOutput.APIKeyID).Str(logger.PolicyOutputName, agentOutputName).Msg("Output removed, will retire API key")
toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{
ID: agentOutput.APIKeyID,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
Expand Down Expand Up @@ -270,10 +270,10 @@ func (p *Output) prepareElasticsearch(
State: client.UnitStateDegraded.String(),
Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err),
}
zerolog.Ctx(ctx).Warn().Err(err).Str("outputName", p.Name).Msg(doc.Message)
zerolog.Ctx(ctx).Warn().Err(err).Str(logger.PolicyOutputName, p.Name).Msg(doc.Message)

if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
zlog.Error().Err(err).Str("outputName", p.Name).Msg("error writing output health")
zlog.Error().Err(err).Str(logger.PolicyOutputName, p.Name).Msg("error writing output health")
}
}

Expand Down
9 changes: 5 additions & 4 deletions internal/pkg/policy/self.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/state"
Expand Down Expand Up @@ -249,7 +250,7 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
return state, nil
}

func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) {
func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger) {
//pinging logic
bulkerMap := bulker.GetBulkerMap()
for outputName, outputBulker := range bulkerMap {
Expand All @@ -262,15 +263,15 @@ func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Lo
if err != nil {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error())
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message)
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg(doc.Message)

} else if res.StatusCode != 200 {
doc.State = client.UnitStateDegraded.String()
doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode)
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message)
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg(doc.Message)
}
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
logger.Error().Err(err).Str("outputName", outputName).Msg("error writing output health")
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg("error writing output health")
}
}
}
Expand Down

0 comments on commit 144facb

Please sign in to comment.