diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go index f816e285eb3..cdfde85c846 100644 --- a/filebeat/input/v2/input.go +++ b/filebeat/input/v2/input.go @@ -18,6 +18,9 @@ package v2 import ( + "context" + "time" + "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -111,3 +114,19 @@ type Canceler interface { Done() <-chan struct{} Err() error } + +type cancelerCtx struct { + Canceler +} + +func GoContextFromCanceler(c Canceler) context.Context { + return cancelerCtx{c} +} + +func (c cancelerCtx) Deadline() (deadline time.Time, ok bool) { + return time.Time{}, false +} + +func (c cancelerCtx) Value(_ any) any { + return nil +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 80ed3f2b2bf..f274fb5fcc9 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -94,19 +94,7 @@ func (in *cloudwatchInput) Test(ctx v2.TestContext) error { } func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { - var err error - - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{}) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 855403e5dc4..733de949f29 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -114,17 +114,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { return fmt.Errorf("can not start persistent store: %w", err) } - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) if in.config.QueueURL != "" { regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)