diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d2c82dfbf64..bede1806694 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Ensure winlog input retains metric collection when handling recoverable errors. {issue}36479[36479] {pull}36483[36483] - Revert error introduced in {pull}35734[35734] when symlinks can't be resolved in filestream. {pull}36557[36557] - Fix ignoring external input configuration in `take_over: true` mode {issue}36378[36378] {pull}36395[36395] +- Enable awscloudwatch input to collect logs from linked source accounts {pull}36645[36645] *Heartbeat* diff --git a/NOTICE.txt b/NOTICE.txt index bc45b225aef..79fdd170deb 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -5025,11 +5025,11 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/aws-lambda-go@v1.13 -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2 -Version: v1.18.0 +Version: v1.21.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2@v1.18.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2@v1.21.0/LICENSE.txt: Apache License @@ -6509,11 +6509,11 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/servi -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs -Version: v1.15.5 +Version: v1.24.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs@v1.15.5/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs@v1.24.0/LICENSE.txt: Apache License @@ -9053,11 +9053,11 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/servi -------------------------------------------------------------------------------- Dependency : github.com/aws/smithy-go -Version: v1.13.5 +Version: v1.14.2 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/aws/smithy-go@v1.13.5/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/aws/smithy-go@v1.14.2/LICENSE: Apache License @@ -31952,11 +31952,11 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/aws/p -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2/internal/configsources -Version: v1.1.33 +Version: v1.1.41 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/internal/configsources@v1.1.33/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/internal/configsources@v1.1.41/LICENSE.txt: Apache License @@ -32164,11 +32164,11 @@ Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/inter -------------------------------------------------------------------------------- Dependency : github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 -Version: v2.4.27 +Version: v2.4.35 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/internal/endpoints/v2@v2.4.27/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/aws/aws-sdk-go-v2/internal/endpoints/v2@v2.4.35/LICENSE.txt: Apache License diff --git a/go.mod b/go.mod index 4cf3e5c020e..43073154e1f 100644 --- a/go.mod +++ b/go.mod @@ -27,11 +27,11 @@ require ( github.com/apoydence/eachers v0.0.0-20181020210610-23942921fe77 // indirect github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/aws/aws-lambda-go v1.13.3 - github.com/aws/aws-sdk-go-v2 v1.18.0 + github.com/aws/aws-sdk-go-v2 v1.21.0 github.com/aws/aws-sdk-go-v2/config v1.17.7 github.com/aws/aws-sdk-go-v2/credentials v1.12.20 github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0 - github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5 + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0 github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4 github.com/aws/aws-sdk-go-v2/service/ec2 v1.36.1 github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.18.4 @@ -198,7 +198,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.33 github.com/aws/aws-sdk-go-v2/service/cloudformation v1.20.4 github.com/aws/aws-sdk-go-v2/service/kinesis v1.15.8 - github.com/aws/smithy-go v1.13.5 + github.com/aws/smithy-go v1.14.2 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/elastic/bayeux v1.0.5 github.com/elastic/elastic-agent-autodiscover v0.6.2 @@ -249,8 +249,8 @@ require ( github.com/armon/go-radix v1.0.0 // indirect github.com/aws/aws-sdk-go v1.38.60 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9 // indirect diff --git a/go.sum b/go.sum index 371aa1efbb2..33fcf790331 100644 --- a/go.sum +++ b/go.sum @@ -285,8 +285,9 @@ github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVj github.com/aws/aws-sdk-go-v2 v1.16.3/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= github.com/aws/aws-sdk-go-v2 v1.16.6/go.mod h1:6CpKuLXg2w7If3ABZCl/qZ6rEgwtjZTn4eAf4RcEyuw= github.com/aws/aws-sdk-go-v2 v1.16.16/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k= -github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2 v1.21.0 h1:gMT0IW+03wtYJhRqTVYn0wLzwdnK9sRMcxmtfGzRdJc= +github.com/aws/aws-sdk-go-v2 v1.21.0/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3/go.mod h1:gNsR5CaXKmQSSzrmGxmwmct/r+ZBfbxorAuXYsj/M5Y= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 h1:tcFliCWne+zOuUfKNRn8JdFBuWPDuISDH08wD2ULkhk= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8/go.mod h1:JTnlBSot91steJeti4ryyu/tLd4Sk84O5W22L7O2EQU= @@ -301,13 +302,15 @@ github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.33/go.mod h1:84XgODVR8uRhm github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.10/go.mod h1:F+EZtuIwjlv35kRJPyBGcsA4f7bnSoz15zOQ2lJq1Z4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.13/go.mod h1:wLLesU+LdMZDM3U0PP9vZXJW39zmD/7L4nY2pSrYZ/g= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23/go.mod h1:2DFxAQ9pfIRy0imBCJv+vZ2X6RKxves6fbnEuSry6b4= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 h1:22dGT7PneFMx4+b3pz7lMTRyN8ZKH7M2cW4GP9yUS2g= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41/go.mod h1:CrObHAuPneJBlfEJ5T3szXOUkLEThaGfvnhTf33buas= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4/go.mod h1:8glyUqVIM4AmeenIsPo0oVh3+NUwnsQml2OFupfQW+0= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.7/go.mod h1:93Uot80ddyVzSl//xEJreNKMhxntr71WtR3v/A1cRYk= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17/go.mod h1:pRwaTYCJemADaqCbUAxltMoHKata7hmB5PjEXeu0kfg= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 h1:SijA0mgjV8E+8G45ltVHs0fvKpTj8xmZJ3VwhGKtUSI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35/go.mod h1:SJC1nEVVva1g3pHAIdCp7QsRIkMmLAgoDquQ9Rr8kYw= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 h1:wj5Rwc05hvUSvKuOF29IYb9QrCLjU+rHAy/x/o0DK2c= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24/go.mod h1:jULHjqqjDlbyTa7pfM7WICATnOv+iOhjletM3N0Xbu8= github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 h1:ZSIPAkAsCCjYrhqfw2+lNzWDzxzHXEckFkTePL5RSWQ= @@ -316,8 +319,8 @@ github.com/aws/aws-sdk-go-v2/service/cloudformation v1.20.4 h1:faP794ma9ZY/24XAV github.com/aws/aws-sdk-go-v2/service/cloudformation v1.20.4/go.mod h1:ybjChNDMfPtc7f8ILTb+ov6CpE/KtAae9fD8HHtYfzU= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0 h1:sSzrsKQULJmPtmu6By4wR6g0701nGqonssKOy35uOd0= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0/go.mod h1:t5mizLPjCYafXoHCXOHJU7z4OvLbY70Echvb1ciBTV4= -github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5 h1:aPK8IBVKeozo/pNGshT8xOJ2V3Y7ykOM49QcY0vhUSM= -github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5/go.mod h1:ErjxucZaraVbYm66xxub00qmGBw7md2RFqy6624KbR8= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0 h1:6LRil7J+uh2SZ58Wkm/5aVRpBOZbTtwi8p8gdsix94c= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0/go.mod h1:5v2ZNXCSwG73rx0k3sCuB1Ju8sbEbG0iUlxCA7D8sV8= github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4 h1:jbfG3cbq1kiK1/OAfUh4zf1ADtAU8KoeOPfF94S96pU= github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4/go.mod h1:yC5cDNa3xzSh5NIU5x0NBBo6QkcsaM0tuPNCczeUPoU= github.com/aws/aws-sdk-go-v2/service/ec2 v1.36.1 h1:FS8Ja6LuLDVHcX+rmoNpOXqYb52N2A5DwQy7Dgduq4Q= @@ -358,8 +361,9 @@ github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAm github.com/aws/smithy-go v1.11.2/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM= github.com/aws/smithy-go v1.12.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.13.3/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= -github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/smithy-go v1.14.2 h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ= +github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/awslabs/goformation/v3 v3.1.0/go.mod h1:hQ5RXo3GNm2laHWKizDzU5DsDy+yNcenSca2UxN0850= github.com/awslabs/goformation/v4 v4.1.0 h1:JRxIW0IjhYpYDrIZOTJGMu2azXKI+OK5dP56ubpywGU= github.com/awslabs/goformation/v4 v4.1.0/go.mod h1:MBDN7u1lMNDoehbFuO4uPvgwPeolTMA2TzX1yO6KlxI= diff --git a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc index c2b898da358..9a038b8c6f2 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc @@ -39,20 +39,31 @@ The `aws-cloudwatch` input supports the following configuration options plus the [float] ==== `log_group_arn` -ARN of the log group to collect logs from. +ARN of the log group to collect logs from. *If using +https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Unified-Cross-Account.html[cross-account observability], +the ARN is required to collect logs from linked source accounts.* +`log_group_name` and `log_group_name_prefix` do not currently support CloudWatch +cross-account observability. + +Only one of `log_group_arn` and `log_group_name` should be configured. +If both are set, `log_group_arn` takes precidence. [float] ==== `log_group_name` Name of the log group to collect logs from. Note: `region_name` is required when log_group_name is given. +Only one of `log_group_arn` and `log_group_name` should be configured. +If both are set, `log_group_arn` takes precidence. + [float] ==== `log_group_name_prefix` The prefix for a group of log group names. Note: `region_name` is required when -log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix` -cannot be given at the same time. The number of workers that will process the +log_group_name_prefix is given. The number of workers that will process the log groups under this prefix is set through the `number_of_workers` config. +Only one of `log_group_name` and `log_group_name_prefix` can be configured. + [float] ==== `region_name` Region that the specified log group or log group prefix belongs to. diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index ca54721bd27..a3f81c5b65c 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -12,6 +12,7 @@ import ( "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/arn" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" @@ -69,7 +70,19 @@ func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, star // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { - // construct FilterLogEventsInput + var logGroupName string + if arn.IsARN(logGroup) { + var err error + logGroupName, _, err = parseARN(logGroup) + if err != nil { + return fmt.Errorf("invalid log group ARN (%s): %w", logGroup, err) + } + } else { + logGroupName = logGroup + } + + // construct FilterLogEventsInput using logGroup instead of logGroupName in order to + // support cross-account log collection using ARN filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput) for paginator.HasMorePages() { @@ -88,16 +101,16 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client p.log.Debug("done sleeping") p.log.Debugf("Processing #%v events", len(logEvents)) - logProcessor.processLogEvents(logEvents, logGroup, p.region) + logProcessor.processLogEvents(logEvents, logGroupName, p.region) } return nil } func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: awssdk.String(logGroup), - StartTime: awssdk.Int64(startTime), - EndTime: awssdk.Int64(endTime), + LogGroupIdentifier: awssdk.String(logGroup), + StartTime: awssdk.Int64(startTime), + EndTime: awssdk.Int64(endTime), } if len(p.logStreams) > 0 { diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index f9d69fe1184..ae76a8ce68a 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -69,13 +69,13 @@ func newInput(config config) (*cloudwatchInput, error) { return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } + // if ARN is provided, parse out the region to ensure correct API calls if config.LogGroupARN != "" { - logGroupName, regionName, err := parseARN(config.LogGroupARN) + _, regionName, err := parseARN(config.LogGroupARN) if err != nil { return nil, fmt.Errorf("parse log group ARN failed: %w", err) } - config.LogGroupName = logGroupName config.RegionName = regionName } @@ -126,9 +126,21 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) } }) - logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName) - if err != nil { - return fmt.Errorf("failed to get log group names: %w", err) + // this list can contain either ARNs or plain log group names; + // ARNs provide support for cross-account monitoring capabilities, + // plain names do not (support will be added for this later). + var logGroups []string + + // Precidence is important here - do not re-order. + if in.config.LogGroupARN != "" { + logGroups = []string{in.config.LogGroupARN} + } else if in.config.LogGroupName != "" { + logGroups = []string{in.config.LogGroupName} + } else { + logGroups, err = getLogGroupNamesFromPrefix(svc, in.config.LogGroupNamePrefix) + if err != nil { + return fmt.Errorf("failed to get log group names: %w", err) + } } log := inputContext.Logger @@ -143,11 +155,11 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) in.config.LogStreams, in.config.LogStreamPrefix) logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx) - cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) - return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) + cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroups))) + return in.Receive(svc, cwPoller, ctx, logProcessor, logGroups) } -func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error { +func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroups []string) error { // This loop tries to keep the workers busy as much as possible while // honoring the number in config opposed to a simpler loop that does one // listing, sequentially processes every object and then does another listing @@ -175,17 +187,17 @@ func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwa } workerWg.Add(availableWorkers) - logGroupNamesLength := len(logGroupNames) + logGroupsLength := len(logGroups) runningGoroutines := 0 - for i := lastLogGroupOffset; i < logGroupNamesLength; i++ { + for i := lastLogGroupOffset; i < logGroupsLength; i++ { if runningGoroutines >= availableWorkers { break } runningGoroutines++ lastLogGroupOffset = i + 1 - if lastLogGroupOffset >= logGroupNamesLength { + if lastLogGroupOffset >= logGroupsLength { // release unused workers cwPoller.workerSem.Release(availableWorkers - runningGoroutines) for j := 0; j < availableWorkers-runningGoroutines; j++ { @@ -194,7 +206,7 @@ func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwa lastLogGroupOffset = 0 } - lg := logGroupNames[i] + lg := logGroups[i] go func(logGroup string, startTime int64, endTime int64) { defer func() { cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) @@ -231,12 +243,8 @@ func parseARN(logGroupARN string) (string, string, error) { return "", "", fmt.Errorf("cannot get log group name from log group ARN: %s", logGroupARN) } -// getLogGroupNames uses DescribeLogGroups API to retrieve all log group names -func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, logGroupName string) ([]string, error) { - if logGroupNamePrefix == "" { - return []string{logGroupName}, nil - } - +// getLogGroupNamesFromPrefix uses DescribeLogGroups API to retrieve all log group names +func getLogGroupNamesFromPrefix(svc *cloudwatchlogs.Client, logGroupNamePrefix string) ([]string, error) { // construct DescribeLogGroupsInput describeLogGroupsInput := &cloudwatchlogs.DescribeLogGroupsInput{ LogGroupNamePrefix: awssdk.String(logGroupNamePrefix), diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go index 999cad4d7f0..6510bb7d8f2 100644 --- a/x-pack/filebeat/input/awscloudwatch/processor.go +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -35,9 +35,9 @@ func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Cli } } -func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroup string, regionName string) { +func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroupName string, regionName string) { for _, logEvent := range logEvents { - event := createEvent(logEvent, logGroup, regionName) + event := createEvent(logEvent, logGroupName, regionName) p.publish(p.ack, &event) } } @@ -49,23 +49,23 @@ func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event p.publisher.Publish(*event) } -func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName string) beat.Event { +func createEvent(logEvent types.FilteredLogEvent, logGroupName string, regionName string) beat.Event { event := beat.Event{ Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), Fields: mapstr.M{ "message": *logEvent.Message, - "log.file.path": logGroup + "/" + *logEvent.LogStreamName, + "log.file.path": logGroupName + "/" + *logEvent.LogStreamName, "event": mapstr.M{ "id": *logEvent.EventId, "ingested": time.Now(), }, "awscloudwatch": mapstr.M{ - "log_group": logGroup, + "log_group": logGroupName, "log_stream": *logEvent.LogStreamName, "ingestion_time": time.Unix(*logEvent.IngestionTime/1000, 0), }, "aws.cloudwatch": mapstr.M{ - "log_group": logGroup, + "log_group": logGroupName, "log_stream": *logEvent.LogStreamName, "ingestion_time": time.Unix(*logEvent.IngestionTime/1000, 0), },