Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use poll timeout in es ctx #3986

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
actions, ackToken = convertActions(zlog, agent.Id, pendingActions)

span, ctx := apm.StartSpan(r.Context(), "longPoll", "process")
ctx, cancel := context.WithTimeout(ctx, pollDuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, evaluating what the timeouts and lifetimes of all the requests in here is actually challenging and I'm not sure if they are right.

This context is not obviously tied to the actual network requests, also at this point, we are passed auth, which should probably also respect a timeout and it's not clear that it is tied to the poll duration either.

What this does is cause us to hit the ctx.Done() block below, which triggers us to hit the ct.writeResponse call. That call is a network operation that should also have a timeout, but it can't be tied to this context because it is expired, so we'd need a different one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking closer, this context deadline actually changes the checkin logic without setting a deadline on any of the underlying network operations or interactions with ES.

There is already a ticker for the long poll duration:

// Chill out for a bit. Long poll.
longPoll := time.NewTicker(pollDuration)
defer longPoll.Stop()

It causes us to hit the CheckIn method here:

case <-tick.C:
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false)
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
}

So all setting this context deadline does is get us to this block, but only if we aren't already in the CheckIn method:

case <-ctx.Done():
defer span.End()
// If the request context is canceled, the API server is shutting down.
// We want to immediately stop the long-poll and return a 200 with the ackToken and no actions.
if errors.Is(ctx.Err(), context.Canceled) {
resp := CheckinResponse{
AckToken: &ackToken,
Action: "checkin",
}
return ct.writeResponse(zlog, w, r, agent, resp)
}
return ctx.Err()

If both <-ctx.Done() and <-tick.C are pending at the same time the Go runtime will randomly choose which case is taken, so the behavior here isn't even deterministic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into it. Do you think there is a way to enforce the pollDuration on the underlying network requests? Maybe someone from the Control Plane team can spend some time on this, it seems to be the reason of some drones stuck in a failed checkin state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking closer, if we are in this loop, the only place where we interact directly with ES are in processPolicy and putting the deadline on the context solves that case:

actionResp, err := processPolicy(ctx, zlog, ct.bulker, agent.Id, policy)

If I follow this all the way down the call stack to where the actual search call happens, the context should prevent us from waiting forever for a response:

func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT {
start := time.Now()
// Dispatch to bulk Run loop
select {
case b.ch <- blk:
case <-ctx.Done():

The one that is actually on the underlying ES network request is in the bulker run function:

if err := b.flushQueue(ctx, w, *q); err != nil {

The context for this appears to just be tied to context.Background in multiple places:

// Bulker is started in its own context and managed in the scope of this function. This is done so
// when the `ctx` is cancelled, the bulker will remain executing until this function exits.
// This allows the child subsystems to continue to write to the data store while tearing down.
bulkCtx, bulkCancel := context.WithCancel(context.Background())
defer bulkCancel()

bulkCtx, bulkCancel := context.WithCancel(context.Background())
es, err := b.createRemoteEsClient(bulkCtx, outputName, outputMap)
if err != nil {
defer bulkCancel()
return nil, hasConfigChanged, err
}
// starting a new bulker to create/update API keys for remote ES output
newBulker := NewBulker(es, b.tracer)
newBulker.cancelFn = bulkCancel
b.updateBulkerMap(outputName, newBulker)
errCh := make(chan error)
go func() {
runFunc := func() (err error) {
zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulker started")
return newBulker.Run(bulkCtx)

It does look like there may be a default 90s timeout on the underlying ES client, but I don't see this actually being called anywhere (possible I missed it):

func (c *Elasticsearch) InitDefaults() {
c.Protocol = schemeHTTP
c.Hosts = []string{"localhost:9200"}
c.Timeout = 90 * time.Second

There are lots of places that could be the problem, TBH I'd just add more logging or spans until we definitively know exactly where we get stuck when these 28+ minute checkins happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe someone from the Control Plane team can spend some time on this, it seems to be the reason of some drones stuck in a failed checkin state.

For now that person is me, let's avoid context switching in someone else while we narrow down what is actually wrong and can evaluate the effort to fix it.

Copy link
Contributor Author

@juliaElastic juliaElastic Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could log out what's the actual timeout used, to see if the default is applied
could be logged out here

Int("cluster.maxConnsPersHost", mcph).

Dur("cluster.timeout", cfg.Output.Elasticsearch.Timeout).

logged it out and it seems to be 90s as defined
image

defer cancel()

if len(actions) == 0 {
LOOP:
for {
Expand Down Expand Up @@ -368,7 +371,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
actions = append(actions, *actionResp)
break LOOP
case <-longPoll.C:
zlog.Trace().Msg("fire long poll")
zlog.Debug().Str(logger.AgentID, agent.Id).Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem likely that it gets stuck here as ct.bc.CheckIn just grabs a lock and adds to a map. But again it could be possible that there is a dead lock here and that lock is held and never freed. Shouldn't be ruled out.

Expand Down
Loading