From 73c6b25b6804db5a02ab5ae79c0b24e16d897051 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 23 Apr 2024 12:30:55 -0400 Subject: [PATCH] Cleanup: Replace custom goroutine shutdown listeners with shared context wrapper (#38957) Simplify context handling in the awscloudwatch and awss3 inputs. The awscloudwatch and awss3 inputs both need to convert their Beats input context to a standard golang context for use with the AWS API, and both of them use custom wrappers that create an extra goroutine to listen for Beat shutdown. I added an explicit wrapper object to the `v2.InputContext` API that is a single function call and requires no extra goroutines, and replaced the context wrapper in both inputs. This doesn't change any functional behavior. --- filebeat/input/v2/input.go | 19 +++++++++++++++++++ x-pack/filebeat/input/awscloudwatch/input.go | 14 +------------- x-pack/filebeat/input/awss3/input.go | 12 +----------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go index f816e285eb3..cdfde85c846 100644 --- a/filebeat/input/v2/input.go +++ b/filebeat/input/v2/input.go @@ -18,6 +18,9 @@ package v2 import ( + "context" + "time" + "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -111,3 +114,19 @@ type Canceler interface { Done() <-chan struct{} Err() error } + +type cancelerCtx struct { + Canceler +} + +func GoContextFromCanceler(c Canceler) context.Context { + return cancelerCtx{c} +} + +func (c cancelerCtx) Deadline() (deadline time.Time, ok bool) { + return time.Time{}, false +} + +func (c cancelerCtx) Value(_ any) any { + return nil +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 80ed3f2b2bf..f274fb5fcc9 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -94,19 +94,7 @@ func (in *cloudwatchInput) Test(ctx v2.TestContext) error { } func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { - var err error - - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{}) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 855403e5dc4..733de949f29 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -114,17 +114,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { return fmt.Errorf("can not start persistent store: %w", err) } - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) if in.config.QueueURL != "" { regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)