Skip to content

Commit

Permalink
Add / report persistent policy error metadata in Coordinator (#3076)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Jul 24, 2023
1 parent 2a3dfaa commit 4269472
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 173 deletions.
16 changes: 10 additions & 6 deletions control_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,23 @@ message StateAgentInfo {
}

// StateResponse is the current state of Elastic Agent.
// Next unused id: 7
message StateResponse {
// Overall information of Elastic Agent.
StateAgentInfo info = 1;
// Overall state of Elastic Agent.

// Overall state + message of Elastic Agent, aggregating errors in other
// states and components
State state = 2;
// Overall state message of Elastic Agent.
string message = 3;
// State of each component in Elastic Agent.
repeated ComponentState components = 4;
// Fleet connectivity state of Elastic Agent.

// Fleet state: healthy / "Connected" if the last RPC call to Fleet
// succeeded, otherwise failed with the associated error string.
State fleetState = 5;
// Fleet connectivity state message of Elastic Agent.
string fleetMessage = 6;

// State of each component in Elastic Agent.
repeated ComponentState components = 4;
}

// DiagnosticFileResult is a file result from a diagnostic result.
Expand Down
151 changes: 94 additions & 57 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,26 @@ type Coordinator struct {
// into the reported state before broadcasting -- State() will report
// agentclient.Failed if one of these is set, even if the underlying
// coordinator state is agentclient.Healthy.
runtimeMgrErr error
runtimeMgrErr error // Currently unused
configMgrErr error
actionsErr error
varsMgrErr error

// Errors resulting from different possible failure modes when setting a
// new policy. Right now there are three different stages where a policy
// update can fail:
// - in generateAST, converting the policy to an AST
// - in process, converting the AST and vars into a full component model
// - while sending the final component model to the runtime manager
//
// The plan is to improve our preprocessing so we can always detect
// failures immediately https://github.com/elastic/elastic-agent/issues/2887.
// For now, we track three distinct errors for those three failure types,
// and merge them into a readable error in generateReportableState.
configErr error
componentGenErr error
runtimeUpdateErr error

// The raw policy before spec lookup or variable substitution
ast *transpiler.AST

Expand Down Expand Up @@ -567,28 +582,28 @@ func (c *Coordinator) Run(ctx context.Context) error {
go c.watchRuntimeComponents(watchCtx)

for {
c.setState(agentclient.Starting, "Waiting for initial configuration and composable variables")
c.setCoordinatorState(agentclient.Starting, "Waiting for initial configuration and composable variables")
// The usual state refresh happens in the main run loop in Coordinator.runner,
// so before/after the runner call we need to trigger state change broadcasts
// manually.
c.refreshState()
err := c.runner(ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
c.setState(agentclient.Stopped, "Requested to be stopped")
c.setCoordinatorState(agentclient.Stopped, "Requested to be stopped")
c.setFleetState(agentclient.Stopped, "Requested to be stopped")
c.refreshState()
// do not restart
return err
}
if errors.Is(err, ErrFatalCoordinator) {
c.setState(agentclient.Failed, "Fatal coordinator error")
c.setCoordinatorState(agentclient.Failed, "Fatal coordinator error")
c.setFleetState(agentclient.Stopped, "Fatal coordinator error")
c.refreshState()
return err
}
}
c.setState(agentclient.Failed, fmt.Sprintf("Coordinator failed and will be restarted: %s", err))
c.setCoordinatorState(agentclient.Failed, fmt.Sprintf("Coordinator failed and will be restarted: %s", err))
c.refreshState()
c.logger.Errorf("coordinator failed and will be restarted: %s", err)
}
Expand Down Expand Up @@ -857,31 +872,36 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {

case change := <-c.managerChans.configManagerUpdate:
if err := c.processConfig(ctx, change.Config()); err != nil {
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
c.logger.Errorf("applying new policy: %s", err.Error())
change.Fail(err)
} else {
if err := change.Ack(); err != nil {
err = fmt.Errorf("failed to ack configuration change: %w", err)
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
// Workaround: setConfigManagerError is usually used by the config
// manager to report failed ACKs / etc when communicating with Fleet.
// We need to report a failed ACK here, but the policy change has
// already been successfully applied so we don't want to report it as
// a general Coordinator or policy failure.
// This arises uniquely here because this is the only case where an
// action is responsible for reporting the failure of its own ACK
// call. The "correct" fix is to make this Ack() call unfailable
// and handle ACK retries and reporting in the config manager like
// with other action types -- this error would then end up invoking
// setConfigManagerError "organically" via the config manager's
// reporting channel. In the meantime, we do it manually.
c.setConfigManagerError(err)
c.logger.Errorf("%s", err.Error())
}
}

case vars := <-c.managerChans.varsManagerUpdate:
if ctx.Err() == nil {
if err := c.processVars(ctx, vars); err != nil {
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
}
c.processVars(ctx, vars)
}

case ll := <-c.logLevelCh:
if ctx.Err() == nil {
if err := c.processLogLevel(ctx, ll); err != nil {
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
}
c.processLogLevel(ctx, ll)
}
}

Expand All @@ -900,7 +920,34 @@ func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (er
span.End()
}()

if err := info.InjectAgentConfig(cfg); err != nil {
err = c.generateAST(cfg)
c.setConfigError(err)
if err != nil {
return err
}

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501

// c.setProtection(protectionConfig)

if c.vars != nil {
return c.refreshComponentModel(ctx)
}
return nil
}

// Generate the AST for a new incoming configuration and, if successful,
// assign it to the Coordinator's ast field.
func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
defer func() {
// Update configErr, which stores the results of the most recent policy
// update and is merged into the Coordinator state in
// generateReportableState.
c.setConfigError(err)
}()

if err = info.InjectAgentConfig(cfg); err != nil {
return err
}

Expand Down Expand Up @@ -943,63 +990,47 @@ func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (er
}

c.ast = rawAst

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501

// c.setProtection(protectionConfig)

if c.vars != nil {
return c.process(ctx)
}
return nil
}

// processVars updates the transpiler vars in the Coordinator.
// Called on the main Coordinator goroutine.
func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) (err error) {
span, ctx := apm.StartSpan(ctx, "vars", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) {
c.vars = vars

if c.ast != nil {
return c.process(ctx)
err := c.refreshComponentModel(ctx)
if err != nil {
c.logger.Errorf("updating Coordinator variables: %s", err.Error())
}
return nil
}

// Called on the main Coordinator goroutine.
func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) (err error) {
span, ctx := apm.StartSpan(ctx, "log_level", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) {
c.setLogLevel(ll)

if c.ast != nil && c.vars != nil {
return c.process(ctx)
err := c.refreshComponentModel(ctx)
if err != nil {
c.logger.Errorf("updating log level: %s", err.Error())
}
return nil
}

// Regenerate the component model based on the current vars and AST, then
// forward the result to the runtime manager.
// Always called on the main Coordinator goroutine.
func (c *Coordinator) process(ctx context.Context) (err error) {
span, ctx := apm.StartSpan(ctx, "process", "app.internal")
func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
if c.ast == nil || c.vars == nil {
// Nothing to process yet
return nil
}

span, ctx := apm.StartSpan(ctx, "refreshComponentModel", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

// regenerate the component model
err = c.recomputeConfigAndComponents()
err = c.generateComponentModel()
if err != nil {
return err
return fmt.Errorf("generating component model: %w", err)
}

signed, err := component.SignedFromPolicy(c.derivedConfig)
Expand All @@ -1021,18 +1052,24 @@ func (c *Coordinator) process(ctx context.Context) (err error) {
c.logger.Info("Updating running component model")
c.logger.With("components", model.Components).Debug("Updating running component model")
err = c.runtimeMgr.Update(model)
c.setRuntimeUpdateError(err)
if err != nil {
return err
return fmt.Errorf("updating runtime: %w", err)
}
c.setState(agentclient.Healthy, "Running")
c.setCoordinatorState(agentclient.Healthy, "Running")
return nil
}

// recomputeConfigAndComponents regenerates the configuration tree and
// generateComponentModel regenerates the configuration tree and
// components from the current AST and vars and returns the result.
// Called from both the main Coordinator goroutine and from external
// goroutines via diagnostics hooks.
func (c *Coordinator) recomputeConfigAndComponents() error {
func (c *Coordinator) generateComponentModel() (err error) {
defer func() {
// Update componentGenErr with the results.
c.setComponentGenError(err)
}()

ast := c.ast.Clone()
inputs, ok := transpiler.Lookup(ast, "inputs")
if ok {
Expand Down
Loading

0 comments on commit 4269472

Please sign in to comment.