diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c28b56a184e..3ee1edbffa3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -132,6 +132,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] +- Fix a bug in cloudwatch task allocation that could skip some logs {issue}38918[38918] {pull}38953[38953] - Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] - entity-analytics input: Improve structured logging. {pull}38990[38990] - Fix config validation for CEL and HTTPJSON inputs when using password grant authentication and `client.id` or `client.secret` are not present. {pull}38962[38962] diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index ca54721bd27..d85480891a0 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -14,61 +14,69 @@ import ( awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" ) type cloudwatchPoller struct { - numberOfWorkers int - apiSleep time.Duration + config config region string - logStreams []*string - logStreamPrefix string - startTime int64 - endTime int64 - workerSem *awscommon.Sem log *logp.Logger metrics *inputMetrics workersListingMap *sync.Map workersProcessingMap *sync.Map + + // When a worker is ready for its next task, it should + // send to workRequestChan and then read from workResponseChan. + // The worker can cancel the request based on other context + // cancellations, but if the write succeeds it _must_ read from + // workResponseChan to avoid deadlocking the main loop. + workRequestChan chan struct{} + workResponseChan chan workResponse + + workerWg sync.WaitGroup +} + +type workResponse struct { + logGroup string + startTime, endTime time.Time } func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, - awsRegion string, apiSleep time.Duration, - numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller { + awsRegion string, config config) *cloudwatchPoller { if metrics == nil { metrics = newInputMetrics("", nil) } return &cloudwatchPoller{ - numberOfWorkers: numberOfWorkers, - apiSleep: apiSleep, - region: awsRegion, - logStreams: logStreams, - logStreamPrefix: logStreamPrefix, - startTime: int64(0), - endTime: int64(0), - workerSem: awscommon.NewSem(numberOfWorkers), log: log, metrics: metrics, + region: awsRegion, + config: config, workersListingMap: new(sync.Map), workersProcessingMap: new(sync.Map), + // workRequestChan is unbuffered to guarantee that + // the worker and main loop agree whether a request + // was sent. workerResponseChan is buffered so the + // main loop doesn't have to block on the workers + // while distributing new data. + workRequestChan: make(chan struct{}), + workResponseChan: make(chan workResponse, 10), } } -func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { +func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) { err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) if err != nil { var errRequestCanceled *awssdk.RequestCanceledError if errors.As(err, &errRequestCanceled) { - p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) + p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled) } p.log.Error("getLogEventsFromCloudWatch failed: ", err) } } // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error { // construct FilterLogEventsInput filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput) @@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). - p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) - time.Sleep(p.apiSleep) + p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep) + time.Sleep(p.config.APISleep) p.log.Debug("done sleeping") p.log.Debugf("Processing #%v events", len(logEvents)) @@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client return nil } -func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ LogGroupName: awssdk.String(logGroup), - StartTime: awssdk.Int64(startTime), - EndTime: awssdk.Int64(endTime), + StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)), + EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)), } - if len(p.logStreams) > 0 { - for _, stream := range p.logStreams { + if len(p.config.LogStreams) > 0 { + for _, stream := range p.config.LogStreams { filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream) } } - if p.logStreamPrefix != "" { - filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) + if p.config.LogStreamPrefix != "" { + filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix) } return filterLogEventsInput } + +func (p *cloudwatchPoller) startWorkers( + ctx context.Context, + svc *cloudwatchlogs.Client, + logProcessor *logProcessor, +) { + for i := 0; i < p.config.NumberOfWorkers; i++ { + p.workerWg.Add(1) + go func() { + defer p.workerWg.Done() + for { + var work workResponse + select { + case <-ctx.Done(): + return + case p.workRequestChan <- struct{}{}: + work = <-p.workResponseChan + } + + p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup) + p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor) + p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup) + } + }() + } +} + +// receive implements the main run loop that distributes tasks to the worker +// goroutines. It accepts a "clock" callback (which on a live input should +// equal time.Now) to allow deterministic unit tests. +func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) { + defer p.workerWg.Wait() + // startTime and endTime are the bounds of the current scanning interval. + // If we're starting at the end of the logs, advance the start time to the + // most recent scan window + var startTime time.Time + endTime := clock().Add(-p.config.Latency) + if p.config.StartPosition == "end" { + startTime = endTime.Add(-p.config.ScanFrequency) + } + for ctx.Err() == nil { + for _, lg := range logGroupNames { + select { + case <-ctx.Done(): + return + case <-p.workRequestChan: + p.workResponseChan <- workResponse{ + logGroup: lg, + startTime: startTime, + endTime: endTime, + } + } + } + + // Delay for ScanFrequency after finishing a time span + p.log.Debugf("sleeping for %v before checking new logs", p.config.ScanFrequency) + select { + case <-time.After(p.config.ScanFrequency): + case <-ctx.Done(): + } + p.log.Debug("done sleeping") + + // Advance to the next time span + startTime, endTime = endTime, clock().Add(-p.config.Latency) + } +} diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go new file mode 100644 index 00000000000..2f8198c021d --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go @@ -0,0 +1,207 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +type clock struct { + time time.Time +} + +func (c *clock) now() time.Time { + return c.time +} + +type receiveTestStep struct { + expected []workResponse + nextTime time.Time +} + +type receiveTestCase struct { + name string + logGroups []string + configOverrides func(*config) + startTime time.Time + steps []receiveTestStep +} + +func TestReceive(t *testing.T) { + // We use a mocked clock so scan frequency can be any positive value. + const defaultScanFrequency = time.Microsecond + t0 := time.Time{} + t1 := t0.Add(time.Hour) + t2 := t1.Add(time.Minute) + t3 := t2.Add(time.Hour) + testCases := []receiveTestCase{ + { + name: "Default config with one log group", + logGroups: []string{"a"}, + startTime: t1, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + }, + nextTime: t3, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t2, endTime: t3}, + }, + }, + }, + }, + { + name: "Default config with two log groups", + logGroups: []string{"a", "b"}, + startTime: t1, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + // start/end times for the second log group should be the same + // even though the clock has changed. + {logGroup: "b", startTime: t0, endTime: t1}, + }, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + {logGroup: "b", startTime: t1, endTime: t2}, + }, + nextTime: t3, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t2, endTime: t3}, + {logGroup: "b", startTime: t2, endTime: t3}, + }, + }, + }, + }, + { + name: "One log group with start_position: end", + logGroups: []string{"a"}, + startTime: t1, + configOverrides: func(c *config) { + c.StartPosition = "end" + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + }, + }, + }, + }, + { + name: "Two log group with start_position: end and latency", + logGroups: []string{"a", "b"}, + startTime: t1, + configOverrides: func(c *config) { + c.StartPosition = "end" + c.Latency = time.Second + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + }, + }, + }, + }, + { + name: "Three log groups with latency", + logGroups: []string{"a", "b", "c"}, + startTime: t1, + configOverrides: func(c *config) { + c.Latency = time.Second + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroup: "b", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroup: "c", startTime: t0, endTime: t1.Add(-time.Second)}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + }, + }, + }, + }, + } + clock := &clock{} + for stepIndex, test := range testCases { + ctx, cancel := context.WithCancel(context.Background()) + p := &cloudwatchPoller{ + workRequestChan: make(chan struct{}), + // Unlike the live cwPoller, we make workResponseChan unbuffered, + // so we can guarantee that clock updates happen when cwPoller has already + // decided on its output + workResponseChan: make(chan workResponse), + log: logp.NewLogger("test"), + } + + p.config = defaultConfig() + p.config.ScanFrequency = defaultScanFrequency + if test.configOverrides != nil { + test.configOverrides(&p.config) + } + clock.time = test.startTime + go p.receive(ctx, test.logGroups, clock.now) + for _, step := range test.steps { + for i, expected := range step.expected { + p.workRequestChan <- struct{}{} + if i+1 == len(step.expected) && !step.nextTime.Equal(time.Time{}) { + // On the last request of the step, we advance the clock if a + // time is set + clock.time = step.nextTime + } + response := <-p.workResponseChan + assert.Equalf(t, expected, response, "%v: step %v response %v doesn't match", test.name, stepIndex, i) + } + } + cancel() + } +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 4ee9daa05ad..75f22e6625a 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -6,10 +6,8 @@ package awscloudwatch import ( "context" - "errors" "fmt" "strings" - "sync" "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -137,82 +135,12 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) log.Named("cloudwatch_poller"), in.metrics, in.awsConfig.Region, - in.config.APISleep, - in.config.NumberOfWorkers, - in.config.LogStreams, - in.config.LogStreamPrefix) + in.config) logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx) cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) - return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) -} - -func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error { - // This loop tries to keep the workers busy as much as possible while - // honoring the number in config opposed to a simpler loop that does one - // listing, sequentially processes every object and then does another listing - start := true - workerWg := new(sync.WaitGroup) - lastLogGroupOffset := 0 - for ctx.Err() == nil { - if !start { - cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) - time.Sleep(in.config.ScanFrequency) - cwPoller.log.Debug("done sleeping") - } - start = false - - currentTime := time.Now() - cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.endTime, in.config.ScanFrequency, in.config.Latency) - cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) - availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) - if err != nil { - break - } - - if availableWorkers == 0 { - continue - } - - workerWg.Add(availableWorkers) - logGroupNamesLength := len(logGroupNames) - runningGoroutines := 0 - - for i := lastLogGroupOffset; i < logGroupNamesLength; i++ { - if runningGoroutines >= availableWorkers { - break - } - - runningGoroutines++ - lastLogGroupOffset = i + 1 - if lastLogGroupOffset >= logGroupNamesLength { - // release unused workers - cwPoller.workerSem.Release(availableWorkers - runningGoroutines) - for j := 0; j < availableWorkers-runningGoroutines; j++ { - workerWg.Done() - } - lastLogGroupOffset = 0 - } - - lg := logGroupNames[i] - go func(logGroup string, startTime int64, endTime int64) { - defer func() { - cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) - workerWg.Done() - cwPoller.workerSem.Release(1) - }() - cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) - cwPoller.run(svc, logGroup, startTime, endTime, logProcessor) - }(lg, cwPoller.startTime, cwPoller.endTime) - } - } - - // Wait for all workers to finish. - workerWg.Wait() - if errors.Is(ctx.Err(), context.Canceled) { - // A canceled context is a normal shutdown. - return nil - } - return ctx.Err() + cwPoller.startWorkers(ctx, svc, logProcessor) + cwPoller.receive(ctx, logGroupNames, time.Now) + return nil } func parseARN(logGroupARN string) (string, string, error) { @@ -256,24 +184,3 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log } return logGroupNames, nil } - -func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) { - if latency != 0 { - // add latency if config is not 0 - currentTime = currentTime.Add(latency * -1) - } - - switch startPosition { - case "beginning": - if endTime != int64(0) { - return endTime, currentTime.UnixNano() / int64(time.Millisecond) - } - return 0, currentTime.UnixNano() / int64(time.Millisecond) - case "end": - if endTime != int64(0) { - return endTime, currentTime.UnixNano() / int64(time.Millisecond) - } - return currentTime.Add(-scanFrequency).UnixNano() / int64(time.Millisecond), currentTime.UnixNano() / int64(time.Millisecond) - } - return 0, 0 -} diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index c51c6a072f4..4f9754c6a13 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -15,109 +15,6 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -func TestGetStartPosition(t *testing.T) { - currentTime := time.Date(2020, time.June, 1, 0, 0, 0, 0, time.UTC) - cases := []struct { - title string - startPosition string - prevEndTime int64 - scanFrequency time.Duration - latency time.Duration - expectedStartTime int64 - expectedEndTime int64 - }{ - { - "startPosition=beginning", - "beginning", - int64(0), - 30 * time.Second, - 0, - int64(0), - int64(1590969600000), - }, - { - "startPosition=end", - "end", - int64(0), - 30 * time.Second, - 0, - int64(1590969570000), - int64(1590969600000), - }, - { - "startPosition=typo", - "typo", - int64(0), - 30 * time.Second, - 0, - int64(0), - int64(0), - }, - { - "startPosition=beginning with prevEndTime", - "beginning", - int64(1590000000000), - 30 * time.Second, - 0, - int64(1590000000000), - int64(1590969600000), - }, - { - "startPosition=end with prevEndTime", - "end", - int64(1590000000000), - 30 * time.Second, - 0, - int64(1590000000000), - int64(1590969600000), - }, - { - "startPosition=beginning with latency", - "beginning", - int64(0), - 30 * time.Second, - 10 * time.Minute, - int64(0), - int64(1590969000000), - }, - { - "startPosition=beginning with prevEndTime and latency", - "beginning", - int64(1590000000000), - 30 * time.Second, - 10 * time.Minute, - int64(1590000000000), - int64(1590969000000), - }, - { - "startPosition=end with latency", - "end", - int64(0), - 30 * time.Second, - 10 * time.Minute, - int64(1590968970000), - int64(1590969000000), - }, - { - "startPosition=end with prevEndTime and latency", - "end", - int64(1590000000000), - 30 * time.Second, - 10 * time.Minute, - int64(1590000000000), - int64(1590969000000), - }, - } - - for _, c := range cases { - t.Run(c.title, func(t *testing.T) { - startTime, endTime := getStartPosition(c.startPosition, currentTime, c.prevEndTime, c.scanFrequency, c.latency) - assert.Equal(t, c.expectedStartTime, startTime) - assert.Equal(t, c.expectedEndTime, endTime) - }) - } -} - func TestCreateEvent(t *testing.T) { logEvent := &types.FilteredLogEvent{ EventId: awssdk.String("id-1"),