-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use poll timeout in es ctx #3986
base: main
Are you sure you want to change the base?
Conversation
This pull request does not have a backport label. Could you fix it @juliaElastic? 🙏
|
|
internal/pkg/api/handleCheckin.go
Outdated
@@ -337,6 +337,9 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r | |||
actions, ackToken = convertActions(zlog, agent.Id, pendingActions) | |||
|
|||
span, ctx := apm.StartSpan(r.Context(), "longPoll", "process") | |||
ctx, cancel := context.WithTimeout(ctx, pollDuration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, evaluating what the timeouts and lifetimes of all the requests in here is actually challenging and I'm not sure if they are right.
This context is not obviously tied to the actual network requests, also at this point, we are passed auth, which should probably also respect a timeout and it's not clear that it is tied to the poll duration either.
What this does is cause us to hit the ctx.Done()
block below, which triggers us to hit the ct.writeResponse
call. That call is a network operation that should also have a timeout, but it can't be tied to this context because it is expired, so we'd need a different one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking closer, this context deadline actually changes the checkin logic without setting a deadline on any of the underlying network operations or interactions with ES.
There is already a ticker for the long poll duration:
fleet-server/internal/pkg/api/handleCheckin.go
Lines 316 to 319 in 7ecbda1
// Chill out for a bit. Long poll. | |
longPoll := time.NewTicker(pollDuration) | |
defer longPoll.Stop() | |
It causes us to hit the CheckIn
method here:
fleet-server/internal/pkg/api/handleCheckin.go
Lines 374 to 379 in 7ecbda1
case <-tick.C: | |
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false) | |
if err != nil { | |
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed") | |
} | |
} |
So all setting this context deadline does is get us to this block, but only if we aren't already in the CheckIn method:
fleet-server/internal/pkg/api/handleCheckin.go
Lines 345 to 356 in 7ecbda1
case <-ctx.Done(): | |
defer span.End() | |
// If the request context is canceled, the API server is shutting down. | |
// We want to immediately stop the long-poll and return a 200 with the ackToken and no actions. | |
if errors.Is(ctx.Err(), context.Canceled) { | |
resp := CheckinResponse{ | |
AckToken: &ackToken, | |
Action: "checkin", | |
} | |
return ct.writeResponse(zlog, w, r, agent, resp) | |
} | |
return ctx.Err() |
If both <-ctx.Done()
and <-tick.C
are pending at the same time the Go runtime will randomly choose which case is taken, so the behavior here isn't even deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into it. Do you think there is a way to enforce the pollDuration on the underlying network requests? Maybe someone from the Control Plane team can spend some time on this, it seems to be the reason of some drones stuck in a failed checkin state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking closer, if we are in this loop, the only place where we interact directly with ES are in processPolicy
and putting the deadline on the context solves that case:
actionResp, err := processPolicy(ctx, zlog, ct.bulker, agent.Id, policy) |
If I follow this all the way down the call stack to where the actual search call happens, the context should prevent us from waiting forever for a response:
fleet-server/internal/pkg/bulk/engine.go
Lines 568 to 574 in 7ecbda1
func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT { | |
start := time.Now() | |
// Dispatch to bulk Run loop | |
select { | |
case b.ch <- blk: | |
case <-ctx.Done(): |
The one that is actually on the underlying ES network request is in the bulker run function:
fleet-server/internal/pkg/bulk/engine.go
Line 356 in 7ecbda1
if err := b.flushQueue(ctx, w, *q); err != nil { |
The context for this appears to just be tied to context.Background
in multiple places:
fleet-server/internal/pkg/server/fleet.go
Lines 374 to 379 in 7ecbda1
// Bulker is started in its own context and managed in the scope of this function. This is done so | |
// when the `ctx` is cancelled, the bulker will remain executing until this function exits. | |
// This allows the child subsystems to continue to write to the data store while tearing down. | |
bulkCtx, bulkCancel := context.WithCancel(context.Background()) | |
defer bulkCancel() | |
fleet-server/internal/pkg/bulk/engine.go
Lines 166 to 182 in 7ecbda1
bulkCtx, bulkCancel := context.WithCancel(context.Background()) | |
es, err := b.createRemoteEsClient(bulkCtx, outputName, outputMap) | |
if err != nil { | |
defer bulkCancel() | |
return nil, hasConfigChanged, err | |
} | |
// starting a new bulker to create/update API keys for remote ES output | |
newBulker := NewBulker(es, b.tracer) | |
newBulker.cancelFn = bulkCancel | |
b.updateBulkerMap(outputName, newBulker) | |
errCh := make(chan error) | |
go func() { | |
runFunc := func() (err error) { | |
zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulker started") | |
return newBulker.Run(bulkCtx) |
It does look like there may be a default 90s timeout on the underlying ES client, but I don't see this actually being called anywhere (possible I missed it):
fleet-server/internal/pkg/config/output.go
Lines 59 to 62 in 7ecbda1
func (c *Elasticsearch) InitDefaults() { | |
c.Protocol = schemeHTTP | |
c.Hosts = []string{"localhost:9200"} | |
c.Timeout = 90 * time.Second |
There are lots of places that could be the problem, TBH I'd just add more logging or spans until we definitively know exactly where we get stuck when these 28+ minute checkins happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe someone from the Control Plane team can spend some time on this, it seems to be the reason of some drones stuck in a failed checkin state.
For now that person is me, let's avoid context switching in someone else while we narrow down what is actually wrong and can evaluate the effort to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could log out what's the actual timeout used, to see if the default is applied
could be logged out here
fleet-server/internal/pkg/es/client.go
Line 39 in 7ecbda1
Int("cluster.maxConnsPersHost", mcph). |
Dur("cluster.timeout", cfg.Output.Elasticsearch.Timeout).
@@ -337,15 +337,20 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r | |||
actions, ackToken = convertActions(zlog, agent.Id, pendingActions) | |||
|
|||
span, ctx := apm.StartSpan(r.Context(), "longPoll", "process") | |||
// ctx, cancel := context.WithTimeout(ctx, pollDuration) | |||
// defer cancel() | |||
|
|||
if len(actions) == 0 { | |||
LOOP: | |||
for { | |||
select { | |||
case <-ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently looking at this the only time this case would be hit would be if the client closes there connection to Fleet Server. As the span, ctx := apm.StartSpan(r.Context(), ...
is being used as the context here, so then this section writes the response. Looking at this code, it shouldn't even write the response. If the context is cancelled then that means the client is no longer connected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic of returning CheckinResponse here was added in this pr: https://github.com/elastic/fleet-server/pull/3165/files#diff-e0c02bac8d151e9941eedd5ef643441665ee3d2f78baf42c121edd45dee08ded
Can the context be cancelled only by the client here? I'm wondering if the writeResponse
is successful, is it correct to return the AckToken
without actions? I'm trying to find where the AckToken
is persisted back to ES.
I think it's persisted in action_seq_no
field on a successful checkin here:
fleet-server/internal/pkg/checkin/bulk.go
Line 263 in 3fd2a48
fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo |
I'm wondering if there is any retry if the agent /acks
request fails or fleet-server fails to persist the action result. I've seen some stuck upgrades where the ack failed and it was never retried.
Though when testing this locally with a simulated error instead of writing action result, I see the retries happening.
@@ -360,6 +365,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r | |||
actions = append(actions, acs...) | |||
break LOOP | |||
case policy := <-sub.Output(): | |||
zlog.Debug().Str(logger.AgentID, agent.Id).Msg("SCALEDEBUG new policy") | |||
actionResp, err := processPolicy(ctx, zlog, ct.bulker, agent.Id, policy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible that it gets stuck here processing the policy, as it doesn't create its own context and timeout after a period of time. This is still using the same context of the request connection.
@@ -368,7 +374,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r | |||
actions = append(actions, *actionResp) | |||
break LOOP | |||
case <-longPoll.C: | |||
zlog.Trace().Msg("fire long poll") | |||
zlog.Debug().Str(logger.AgentID, agent.Id).Msg("fire long poll") | |||
break LOOP | |||
case <-tick.C: | |||
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem likely that it gets stuck here as ct.bc.CheckIn
just grabs a lock and adds to a map. But again it could be possible that there is a dead lock here and that lock is held and never freed. Shouldn't be ruled out.
The long poll issue is reproduced again here: https://github.com/elastic/ingest-dev/issues/3783#issuecomment-2429301669 |
Quality Gate passedIssues Measures |
What is the problem this PR solves?
// Please do not just reference an issue. Explain WHAT the problem this PR solves here.
How does this PR solve the problem?
// Explain HOW you solved the problem in your code. It is possible that during PR reviews this changes and then this section should be updated.
How to test this PR locally
Design Checklist
Checklist
./changelog/fragments
using the changelog toolRelated issues