Skip to content

Commit

Permalink
Cleanup: Replace custom goroutine shutdown listeners with shared cont…
Browse files Browse the repository at this point in the history
…ext wrapper (#38957)

Simplify context handling in the awscloudwatch and awss3 inputs.

The awscloudwatch and awss3 inputs both need to convert their Beats input context to a standard golang context for use with the AWS API, and both of them use custom wrappers that create an extra goroutine to listen for Beat shutdown. I added an explicit wrapper object to the `v2.InputContext` API that is a single function call and requires no extra goroutines, and replaced the context wrapper in both inputs.

This doesn't change any functional behavior.
  • Loading branch information
faec authored Apr 23, 2024
1 parent 38f49a9 commit 73c6b25
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 24 deletions.
19 changes: 19 additions & 0 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package v2

import (
"context"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -111,3 +114,19 @@ type Canceler interface {
Done() <-chan struct{}
Err() error
}

type cancelerCtx struct {
Canceler
}

func GoContextFromCanceler(c Canceler) context.Context {
return cancelerCtx{c}
}

func (c cancelerCtx) Deadline() (deadline time.Time, ok bool) {
return time.Time{}, false
}

func (c cancelerCtx) Value(_ any) any {
return nil
}
14 changes: 1 addition & 13 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,7 @@ func (in *cloudwatchInput) Test(ctx v2.TestContext) error {
}

func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
var err error

// Wrap input Context's cancellation Done channel a context.Context. This
// goroutine stops with the parent closes the Done channel.
ctx, cancelInputCtx := context.WithCancel(context.Background())
go func() {
defer cancelInputCtx()
select {
case <-inputContext.Cancelation.Done():
case <-ctx.Done():
}
}()
defer cancelInputCtx()
ctx := v2.GoContextFromCanceler(inputContext.Cancelation)

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{})
Expand Down
12 changes: 1 addition & 11 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
return fmt.Errorf("can not start persistent store: %w", err)
}

// Wrap input Context's cancellation Done channel a context.Context. This
// goroutine stops with the parent closes the Done channel.
ctx, cancelInputCtx := context.WithCancel(context.Background())
go func() {
defer cancelInputCtx()
select {
case <-inputContext.Cancelation.Done():
case <-ctx.Done():
}
}()
defer cancelInputCtx()
ctx := v2.GoContextFromCanceler(inputContext.Cancelation)

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)
Expand Down

0 comments on commit 73c6b25

Please sign in to comment.