From 84d536daaf83a02343acd56836fd4082c9e49a53 Mon Sep 17 00:00:00 2001 From: Shaun Remekie Date: Fri, 15 Nov 2024 14:08:04 +0100 Subject: [PATCH] cds-1666 Add support for Assuming Role, Custom Role and Parameter to select Concurrency Limit (#117) * Add new variable LambdaAssumeRoleARN * added support for AssumeRole, Resrved Conrrent Execution parameter and remved circular dependency on DeadletterQueue * changelog * added aws_sdk_sts * updated readme * updated clients mod to support assume role for S3 and ECR clients * cargo fmt * added AssumeRole variable to config * added override condition for assume role and cargo fmt * align cargo.toml version with template version * allow ReservedConcurrentExecutions to have no value * remove add_role function from custom lambda code * add update_lambda_role function to custom lambda * Adjusted Kinesis Condition, Added ReservedConcurrencyLimit as an AWS::Novalue condition, Add additional custom resource Waiter to add delay after Role is updated with the Policy, Moved lambda Policy into its own block. * removed role update and reservced concurrency limit functions * Add permissions to kinesis integration, remove function update_lambda_role from custom resource code * Add the lambda permissions to create log group * Add to dlq test permission to access S3 bucket, fix permission to privatelink * fix permissions in lambda policy * update readme file * removed comment * updated readme with info on Role Parameters * removed unneeded function * removed unused import --------- Co-authored-by: guyrenny Co-authored-by: Concourse <42734517+coralogix-concourse@users.noreply.github.com> --- CHANGELOG.md | 9 +- Cargo.lock | 156 ++++++++++++--- Cargo.toml | 3 +- README.md | 159 ++++++++------- custom-resource/index.py | 3 +- src/clients.rs | 68 ++++++- src/lib.rs | 2 +- src/logs/config.rs | 2 + src/logs/coralogix.rs | 6 +- src/logs/mod.rs | 51 ++--- src/logs/process.rs | 50 +++-- src/main.rs | 37 ++-- src/metrics/mod.rs | 8 +- template.yaml | 403 +++++++++++++++++++++++++++------------ tests/logs.rs | 2 +- 15 files changed, 654 insertions(+), 305 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea445a1..d6eae12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ # Changelog -## v1.0.15 / 2024-01-24 +## v1.0.15 / 2024-11-09 +### 💡 Enhancements 💡 +- Add new parameter `LambdaAssumeRoleARN` which accept role arn, that the lambda will use for Execution role. +- Update internal code to support the new parameter `LambdaAssumeRoleARN`. +- Add new parameter ReservedConcurrentExecutions to the lambda function. +- Removed circular dependency between DeadLetterQueue and CustomResourceFunction + +## v1.0.14 / 2024-01-24 ### 💡 Enhancements 💡 - Internal code refactoring to isolate logs workflow from additional telemetry workflows to come. diff --git a/Cargo.lock b/Cargo.lock index 5a30e68..e5e8e1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,9 +219,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -231,15 +231,16 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.3.1" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87c5f920ffd1e0526ec9e70e50bf444db50b204395a0fa7016bbf9e31ea1698f" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", "aws-smithy-http", + "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", @@ -247,6 +248,7 @@ dependencies = [ "fastrand", "http 0.2.12", "http-body 0.4.6", + "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -401,9 +403,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.36.0" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32e6ecdb2bd756f3b2383e6f0588dc10a4e65f5d551e70a56e0bfe0c884673ce" +checksum = "53dcf5e7d9bd1517b8b998e170e650047cea8a2b85fe1835abe3210713e541b7" dependencies = [ "aws-credential-types", "aws-runtime", @@ -424,9 +426,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.3" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -485,9 +487,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" dependencies = [ "aws-smithy-types", "bytes", @@ -496,9 +498,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.9" +version = "0.60.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9cd0ae3d97daa0a2bf377a4d8e8e1362cae590c4a1aad0d40058ebca18eb91e" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -526,12 +528,15 @@ dependencies = [ [[package]] name = "aws-smithy-protocol-test" -version = "0.60.8" +version = "0.63.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020468b04f916b36e0a791c4ebf80777ad2c25d8b9ebb8db14939e98a37abec0" +checksum = "3b92b62199921f10685c6b588fdbeb81168ae4e7950ae3e5f50145a01bb5f1ad" dependencies = [ "assert-json-diff", "aws-smithy-runtime-api", + "base64-simd", + "cbor-diag", + "ciborium", "http 0.2.12", "pretty_assertions", "regex-lite", @@ -552,9 +557,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce87155eba55e11768b8c1afa607f3e864ae82f03caf63258b37455b0ad02537" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -584,9 +589,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.1" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30819352ed0a04ecf6a2f3477e344d2d1ba33d43e0f09ad9047c12e0d923616f" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -601,9 +606,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.0" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" dependencies = [ "base64-simd", "bytes", @@ -627,9 +632,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.8" +version = "0.60.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" dependencies = [ "xmlparser", ] @@ -810,6 +815,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "tinyvec", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -835,6 +849,25 @@ dependencies = [ "either", ] +[[package]] +name = "cbor-diag" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc245b6ecd09b23901a4fbad1ad975701fd5061ceaef6afa93a2d70605a64429" +dependencies = [ + "bs58", + "chrono", + "data-encoding", + "half", + "nom", + "num-bigint", + "num-rational", + "num-traits", + "separator", + "url", + "uuid", +] + [[package]] name = "cc" version = "1.1.6" @@ -862,6 +895,33 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "colorchoice" version = "1.0.1" @@ -882,7 +942,7 @@ checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" [[package]] name = "coralogix-aws-shipper" -version = "1.0.14" +version = "1.0.15" dependencies = [ "anyhow", "async-recursion", @@ -892,6 +952,7 @@ dependencies = [ "aws-sdk-s3", "aws-sdk-secretsmanager", "aws-sdk-sqs", + "aws-sdk-sts", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -964,6 +1025,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-bigint" version = "0.4.9" @@ -1089,6 +1156,12 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "der" version = "0.6.1" @@ -1423,6 +1496,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1953,6 +2036,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1968,6 +2061,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -2544,6 +2648,12 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "separator" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97841a747eef040fcd2e7b3b9a220a7205926e60488e673d9e4926d27772ce5" + [[package]] name = "serde" version = "1.0.204" diff --git a/Cargo.toml b/Cargo.toml index 1e9b260..02e3c11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "coralogix-aws-shipper" -version = "1.0.14" +version = "1.0.15" edition = "2021" [dependencies] @@ -37,6 +37,7 @@ chrono = "0.4.37" async-recursion = "1.1.0" md5 = "0.7.0" anyhow = "1.0.86" +aws-sdk-sts = "1.49.0" [dev-dependencies] pretty_assertions_sorted = "1.2.1" diff --git a/README.md b/README.md index 0dba313..fece530 100644 --- a/README.md +++ b/README.md @@ -76,19 +76,41 @@ Use an existing Coralogix [Send-Your-Data API key](https://coralogix.com/docs/se > **Note:** You should always deploy the AWS Lambda function in the same AWS Region as your resource (e.g. the S3 bucket). -| Parameter | Description | Default Value | Required | -|-----------|--------------|---------------|----------| -| Application name | This will also be the name of the CloudFormation stack that creates your integration. It can include letters (A–Z and a–z), numbers (0–9) and dashes (-). | | :heavy_check_mark: | -| IntegrationType | Choose the AWS service that you wish to integrate with Coralogix. Can be one of: S3, CloudTrail, VpcFlow, CloudWatch, S3Csv, SNS, SQS, CloudFront, Kinesis, Kafka, MSK, EcrScan. | S3 | :heavy_check_mark: | -| CoralogixRegion | Your data source should be in the same region as the integration stack. You may choose from one of [the default Coralogix regions](https://coralogix.com/docs/coralogix-domain/): [Custom, EU1, EU2, AP1, AP2, US1, US2]. If this value is set to Custom you must specify the Custom Domain to use via the CustomDomain parameter. | Custom | :heavy_check_mark: | -| CustomDomain | If you choose a custom domain name for your private cluster, Coralogix will send telemetry from the specified address (e.g. custom.coralogix.com). | | | -| ApplicationName | The name of the application for which the integration is configured. [Advanced Configuration](#advanced-configuration) specifies dynamic value retrieval options.| | :heavy_check_mark: | -| SubsystemName | Specify the [name of your subsystem](https://coralogix.com/docs/application-and-subsystem-names/). For a dynamic value, refer to the Advanced Configuration section. For CloudWatch, leave this field empty to use the log group name. | | :heavy_check_mark: | | ApiKey | The Send-Your-Data [API Key](https://coralogix.com/docs/send-your-data-api-key/) validates your authenticity. This value can be a direct Coralogix API Key or an AWS Secret Manager ARN containing the API Key.| | :heavy_check_mark: | -| ApiKey | The Send-Your-Data [API Key](https://coralogix.com/docs/send-your-data-api-key/) validates your authenticity. This value can be a direct Coralogix API Key or an AWS Secret Manager ARN containing the API Key.
_Note the parameter expects the API Key in plain text or if stored in secret manager._| | :heavy_check_mark: | -| StoreAPIKeyInSecretsManager | Enable this to store your API Key securely. Otherwise, it will remain exposed in plain text as an environment variable in the Lambda function console.| True | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|--------------------| +| Application name | This will also be the name of the CloudFormation stack that creates your integration. It can include letters (A–Z and a–z), numbers (0–9) and dashes (-). | | :heavy_check_mark: | +| IntegrationType | Choose the AWS service that you wish to integrate with Coralogix. Can be one of: S3, CloudTrail, VpcFlow, CloudWatch, S3Csv, SNS, SQS, CloudFront, Kinesis, Kafka, MSK, EcrScan. | S3 | :heavy_check_mark: | +| CoralogixRegion | Your data source should be in the same region as the integration stack. You may choose from one of [the default Coralogix regions](https://coralogix.com/docs/coralogix-domain/): [Custom, EU1, EU2, AP1, AP2, US1, US2]. If this value is set to Custom you must specify the Custom Domain to use via the CustomDomain parameter. | Custom | :heavy_check_mark: | +| CustomDomain | If you choose a custom domain name for your private cluster, Coralogix will send telemetry from the specified address (e.g. custom.coralogix.com). | | | +| ApplicationName | The name of the application for which the integration is configured. [Advanced Configuration](#advanced-configuration) specifies dynamic value retrieval options. | | :heavy_check_mark: | +| SubsystemName | Specify the [name of your subsystem](https://coralogix.com/docs/application-and-subsystem-names/). For a dynamic value, refer to the Advanced Configuration section. For CloudWatch, leave this field empty to use the log group name. | | :heavy_check_mark: | +| ApiKey | The Send-Your-Data [API Key](https://coralogix.com/docs/send-your-data-api-key/) validates your authenticity. This value can be a direct Coralogix API Key or an AWS Secret Manager ARN containing the API Key.
*Note the parameter expects the API Key in plain text or if stored in secret manager.* | | :heavy_check_mark: | +| StoreAPIKeyInSecretsManager | Enable this to store your API Key securely. Otherwise, it will remain exposed in plain text as an environment variable in the Lambda function console. | True | :heavy_check_mark: | +| ReservedConcurrentExecutions | The number of concurrent executions that are reserved for the function, leave empty so the lambda will use unreserved account concurrency. | n/a | | +| LambdaAssumeRoleARN | A role that the lambda will assume, leave empty to use the default permissions.
Note that if this Parameter is used, all __S3__ and __ECR__ API calls from the lambda will be made with the permissions of the Assumed Role. | | | +| ExecutionRoleARN | The arn of a user defined role that will be used as the execution role for the lambda function | | | > **Note:** `EcrScan` doesn't need any extra configuration. +#### Working with Roles + +In some cases special or more fine tuned IAM permissions are required. The AWS Shipper supports more granular IAM control using 2 parameters: + +- __LambdaAssumeRoleARN__: This parameter allows you to specify a Role ARN, enabling the Lambda function to assume the role. The assumed role will only affect S3 and ECR API calls, as these are the only services invoked by the Lambda function at the code level. + + +- __ExecutionRoleARN__: This parameter lets you specify the Execution Role for the AWS Shipper Lambda. The provided role must have basic Lambda execution permissions, and any additional permissions required for the Lambda’s operation will be automatically added during deployment. + +Basic lambda execution role permission: + +```yaml + Statement: + - Effect: "Allow" + Principal: + Service: "lambda.amazonaws.com" + Action: "sts:AssumeRole" +``` + ### S3/CloudTrail/VpcFlow/S3Csv Configuration This is the most flexible type of integration, as it is based on receiving log files to Amazon S3. First, your bucket can receive log files from all kinds of other services, such as CloudTrail, VPC Flow Logs, Redshift, Network Firewall or different types of load balancers (ALB/NLB/ELB). Once the data is in the bucket, a pre-made Lambda function will then transmit it to your Coralogix account. @@ -101,105 +123,104 @@ If you don’t want to send data directly as it enters S3, you can also use SNS/ > **Note:** All resources, such as S3 or SNS/SQS, should be provisioned already. If you are using an S3 bucket as a resource, please make sure it is clear of any Lambda triggers located in the same AWS region as your new function. -| Parameter | Description | Default Value | Required | -|-----------|-------------|---------------|----------| -| S3BucketName | Specify the name of the AWS S3 bucket that you want to monitor. | | :heavy_check_mark: | -| S3KeyPrefix | Specify the prefix of the log path within your S3 bucket. This value is ignored if you use the SNSTopicArn/SQSTopicArn parameter.| CloudTrail/VpcFlow 'AWSLogs/' | | -| S3KeySuffix | Filter for the suffix of the file path in your S3 bucket. This value is ignored if you use the SNSTopicArn/SQSTopicArn parameter. | CloudTrail '.json.gz', VpcFlow '.log.gz' | | -| NewlinePattern | Enter a regular expression to detect a new log line for multiline logs, e.g., \n(?=\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}). | | | -| SNSTopicArn | The ARN for the SNS topic that contains the SNS subscription responsible for retrieving logs from Amazon S3. | | | -| SQSTopicArn | The ARN for the SQS queue that contains the SQS subscription responsible for retrieving logs from Amazon S3.| | | -| CSVDelimiter | Specify a single character to be used as a delimiter when ingesting a CSV file with a header line. This value is applicable when the S3Csv integration type is selected, for example, “,” or ” “. | , | | +| Parameter | Description | Default Value | Required | +|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------|--------------------| +| S3BucketName | Specify the name of the AWS S3 bucket that you want to monitor. | | :heavy_check_mark: | +| S3KeyPrefix | Specify the prefix of the log path within your S3 bucket. This value is ignored if you use the SNSTopicArn/SQSTopicArn parameter. | CloudTrail/VpcFlow 'AWSLogs/' | | +| S3KeySuffix | Filter for the suffix of the file path in your S3 bucket. This value is ignored if you use the SNSTopicArn/SQSTopicArn parameter. | CloudTrail '.json.gz', VpcFlow '.log.gz' | | +| NewlinePattern | Enter a regular expression to detect a new log line for multiline logs, e.g., \n(?=\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}). | | | +| SNSTopicArn | The ARN for the SNS topic that contains the SNS subscription responsible for retrieving logs from Amazon S3. | | | +| SQSTopicArn | The ARN for the SQS queue that contains the SQS subscription responsible for retrieving logs from Amazon S3. | | | +| CSVDelimiter | Specify a single character to be used as a delimiter when ingesting a CSV file with a header line. This value is applicable when the S3Csv integration type is selected, for example, “,” or ” “. | , | | ### CloudWatch Configuration Coralogix can be configured to receive data directly from your CloudWatch log group. CloudWatch logs are streamed directly to Coralogix via Lambda. This option does not use S3. You must provide the log group name as a parameter during setup. -| Parameter | Description | Default Value | Required | -|-----------|--------------|---------------|----------| -| CloudWatchLogGroupName | Provide a comma-separated list of CloudWatch log group names to monitor, for example, (`log-group1`, `log-group2`, `log-group3`). | | :heavy_check_mark: | -| CloudWatchLogGroupPrefix | Prefix of the CloudWatch log groups that will trigger the lambda, in case that your log groups are `log-group1, log-group2, log-group3` then you can set the value to `log-group`. When using this variable you will not be able to see the log groups as trigger for the lambda. The parameter dose not replace **CloudWatchLogGroupName** parameter | | | +| Parameter | Description | Default Value | Required | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|--------------------| +| CloudWatchLogGroupName | Provide a comma-separated list of CloudWatch log group names to monitor, for example, (`log-group1`, `log-group2`, `log-group3`). | | :heavy_check_mark: | +| CloudWatchLogGroupPrefix | Prefix of the CloudWatch log groups that will trigger the lambda, in case that your log groups are `log-group1, log-group2, log-group3` then you can set the value to `log-group`. When using this variable you will not be able to see the log groups as trigger for the lambda. The parameter dose not replace **CloudWatchLogGroupName** parameter | | | -In case your log group name is longer than 70, than in the lambda function you will see the permission for that log group as: -`allow-trigger-from-` this is because of length limit in AWS for permission name. +In case your log group name is longer than 70, than in the lambda function you will see the permission for that log group as: `allow-trigger-from-` this is because of length limit in AWS for permission name. ### SNS Configuration To receive SNS messages directly to Coralogix, use the `SNSIntegrationTopicARN` parameter. This differs from the above use of `SNSTopicArn`, which notifies based on S3 events. -| Parameter | Description | Default Value | Required | -|-----------|--------------|---------------|----------| -| SNSIntegrationTopicArn | Provide the ARN of the SNS topic to which you want to subscribe for retrieving messages. | | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|------------------------|------------------------------------------------------------------------------------------|---------------|--------------------| +| SNSIntegrationTopicArn | Provide the ARN of the SNS topic to which you want to subscribe for retrieving messages. | | :heavy_check_mark: | ### SQS Configuration To receive SQS messages directly to Coralogix, use the `SQSIntegrationTopicARN` parameter. This differs from the above use of `SQSTopicArn`, which notifies based on S3 events. -| Parameter | Description | Default Value | Required | -|-----------|-------------|---------------|----------| -| SQSIntegrationTopicArn | Provide the ARN of the SQS queue to which you want to subscribe for retrieving messages. | | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|------------------------|------------------------------------------------------------------------------------------|---------------|--------------------| +| SQSIntegrationTopicArn | Provide the ARN of the SQS queue to which you want to subscribe for retrieving messages. | | :heavy_check_mark: | ### Kinesis Configuration We can receive direct [Kinesis](https://aws.amazon.com/kinesis/) stream data from your AWS account to Coralogix. Your Kinesis stream ARN is a required parameter in this case. -| Parameter | Description | Default Value | Required | -|-----------|-------------|---------------|----------| -| KinesisStreamArn | Provide the ARN of the Kinesis Stream to which you want to subscribe for retrieving messages. | | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|------------------|-----------------------------------------------------------------------------------------------|---------------|--------------------| +| KinesisStreamArn | Provide the ARN of the Kinesis Stream to which you want to subscribe for retrieving messages. | | :heavy_check_mark: | ### Kafka Configuration -| Parameter | Description | Default Value | Required | -|-----------|-------------|---------------|----------| -| KafkaBrokers | Comma-delimited list of Kafka brokers to establish a connection with. | | :heavy_check_mark: | -| KafkaTopic | Subscribe to this Kafka topic for data consumption.| | :heavy_check_mark: | -| KafkaBatchSize | Specify the size of data batches to be read from Kafka during each retrieval. | 100 | | -| KafkaSecurityGroups | Comma-delimited list of Kafka security groups for secure connection setup. | | :heavy_check_mark: | -| KafkaSubnets | Comma-delimited list of Kafka subnets to use when connecting to Kafka. | | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|---------------------|-------------------------------------------------------------------------------|---------------|--------------------| +| KafkaBrokers | Comma-delimited list of Kafka brokers to establish a connection with. | | :heavy_check_mark: | +| KafkaTopic | Subscribe to this Kafka topic for data consumption. | | :heavy_check_mark: | +| KafkaBatchSize | Specify the size of data batches to be read from Kafka during each retrieval. | 100 | | +| KafkaSecurityGroups | Comma-delimited list of Kafka security groups for secure connection setup. | | :heavy_check_mark: | +| KafkaSubnets | Comma-delimited list of Kafka subnets to use when connecting to Kafka. | | :heavy_check_mark: | ### MSK Configuration Your Lambda function must be in a VPC that has access to the MSK cluster. You can configure your VPC via the provided [VPC configuration parameters](#vpc-configuration-optional). -| Parameter | Description | Default Value | Required | -|-----------|-------------|---------------|----------| -| MSKBrokers | Comma-delimited list of MSK brokers to connect to. | | :heavy_check_mark: | -| KafkaTopic | Comma separated list of Kafka topics to Subscribe to. | | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|------------|-------------------------------------------------------|---------------|--------------------| +| MSKBrokers | Comma-delimited list of MSK brokers to connect to. | | :heavy_check_mark: | +| KafkaTopic | Comma separated list of Kafka topics to Subscribe to. | | :heavy_check_mark: | ### Generic Configuration (Optional) These are optional parameters if you wish to receive notification emails, exclude certain logs or send messages to Coralogix at a particular rate. -| Parameter | Description | Default Value | Required | -|-----------|--------------|---------------|----------| -| NotificationEmail | A failure notification will be sent to this email address.| | | -| BlockingPattern | Enter a regular expression to identify lines excluded from being sent to Coralogix. For example, use `MainActivity.java:\d{3}` to match log lines with `MainActivity` followed by exactly three digits. | | | -| SamplingRate | Send messages at a specific rate, such as 1 out of every N logs. For example, if your value is 10, a message will be sent for every 10th log.| 1 | :heavy_check_mark: | -| AddMetadata | Add aws event metadata to the log message. Expects comma separated values. Options for S3 are `bucket_name`,`key_name`. For CloudWatch use `stream_name`, `loggroup_name` . For Kafka/MSK use `topic_name` | | -| CustomMetadata | Add custom metadata to the log message. Expects comma separated values. Options are key1=value1,key2=value2 | | | +| Parameter | Description | Default Value | Required | +|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|--------------------| +| NotificationEmail | A failure notification will be sent to this email address. | | | +| BlockingPattern | Enter a regular expression to identify lines excluded from being sent to Coralogix. For example, use `MainActivity.java:\d{3}` to match log lines with `MainActivity` followed by exactly three digits. | | | +| SamplingRate | Send messages at a specific rate, such as 1 out of every N logs. For example, if your value is 10, a message will be sent for every 10th log. | 1 | :heavy_check_mark: | +| AddMetadata | Add aws event metadata to the log message. Expects comma separated values. Options for S3 are `bucket_name`,`key_name`. For CloudWatch use `stream_name`, `loggroup_name` . For Kafka/MSK use `topic_name` | | | +| CustomMetadata | Add custom metadata to the log message. Expects comma separated values. Options are key1=value1,key2=value2 | | | ### Lambda Configuration (Optional) These are the default presets for Lambda. Read [Troubleshooting](#troubleshooting) for more information on changing these defaults. -| Parameter | Description | Default Value | Required | -|-----------|-------------|---------------|----------| -| FunctionMemorySize | Specify the memory size for the Lambda function in megabytes. | 1024 | :heavy_check_mark: | -| FunctionTimeout | Set a timeout for the Lambda function in seconds. | 300 | :heavy_check_mark: | -| LogLevel | Specify the log level for the Lambda function, choosing from the following options: INFO, WARN, ERROR, DEBUG. | WARN | :heavy_check_mark: | -| LambdaLogRetention | Set the CloudWatch log retention period (in days) for logs generated by the Lambda function. | 5 | :heavy_check_mark: | -| FunctionRunTime | Type of runtime for the lambda, allowd values are provided.al2023 or provided.al2. | provided.al2023 | :heavy_check_mark: | -| FunctionArchitectures | Architectures for the lambda function, allowed values are arm64 or x86_64. | arm64 | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|-----------------------|---------------------------------------------------------------------------------------------------------------|-----------------|--------------------| +| FunctionMemorySize | Specify the memory size for the Lambda function in megabytes. | 1024 | :heavy_check_mark: | +| FunctionTimeout | Set a timeout for the Lambda function in seconds. | 300 | :heavy_check_mark: | +| LogLevel | Specify the log level for the Lambda function, choosing from the following options: INFO, WARN, ERROR, DEBUG. | WARN | :heavy_check_mark: | +| LambdaLogRetention | Set the CloudWatch log retention period (in days) for logs generated by the Lambda function. | 5 | :heavy_check_mark: | +| FunctionRunTime | Type of runtime for the lambda, allowd values are provided.al2023 or provided.al2. | provided.al2023 | :heavy_check_mark: | +| FunctionArchitectures | Architectures for the lambda function, allowed values are arm64 or x86_64. | arm64 | :heavy_check_mark: | ### VPC Configuration (Optional) Use the following options if you need to configure a private link with Coralogix. -| Parameter | Description | Default Value | Required | -|-----------|-------------|---------------|----------| -| LambdaSubnetID | Specify the ID of the subnet where the integration should be deployed. | | :heavy_check_mark: | -| LambdaSecurityGroupID | Specify the ID of the Security Group where the integration should be deployed. | | :heavy_check_mark: | -| UsePrivateLink | Set this to true if you will be using AWS PrivateLink. | false | :heavy_check_mark: | +| Parameter | Description | Default Value | Required | +|-----------------------|--------------------------------------------------------------------------------|---------------|--------------------| +| LambdaSubnetID | Specify the ID of the subnet where the integration should be deployed. | | :heavy_check_mark: | +| LambdaSecurityGroupID | Specify the ID of the Security Group where the integration should be deployed. | | :heavy_check_mark: | +| UsePrivateLink | Set this to true if you will be using AWS PrivateLink. | false | :heavy_check_mark: | ### Advanced Configuration @@ -225,7 +246,7 @@ The DLQ workflow for the Coralogix AWS Shipper is as follows: ![DLQ Workflow](./static/dlq-workflow.png) - To enable the DLQ, you must provide the required parameters outlined below. +To enable the DLQ, you must provide the required parameters outlined below. | Parameter | Description | Default Value | Required | |---------------|-------------------------------------------------------------------------------|---------------|--------------------| @@ -236,8 +257,7 @@ The DLQ workflow for the Coralogix AWS Shipper is as follows: ## Troubleshooting -**Parameter max value** -If you tried to deploy the integration and got this error `length is greater than 4094`, then you can upload the value of the parameter to an S3 bucket as txt and pass the file URL as the parameter value ( this option is available for `KafkaTopic` and `CloudWatchLogGroupName` parameters). +**Parameter max value** If you tried to deploy the integration and got this error `length is greater than 4094`, then you can upload the value of the parameter to an S3 bucket as txt and pass the file URL as the parameter value ( this option is available for `KafkaTopic` and `CloudWatchLogGroupName` parameters). **Timeout errors** @@ -251,8 +271,7 @@ If you see “Task out of memory”, you should increase the Lambda maximum Memo To add more verbosity to your function logs, set RUST_LOG to DEBUG. -**Trigger Failed on Deployment** -If Deployment is failing while asigning the trigger, please check that S3 Bucket notifications has no notifications enabled. If Using Cloudwatch max number of notificactions per LogGroup is 2. +**Trigger Failed on Deployment** If Deployment is failing while asigning the trigger, please check that S3 Bucket notifications has no notifications enabled. If Using Cloudwatch max number of notificactions per LogGroup is 2. > **Warning:** Remember to change it back to WARN after troubleshooting. diff --git a/custom-resource/index.py b/custom-resource/index.py index 77809b1..c340235 100644 --- a/custom-resource/index.py +++ b/custom-resource/index.py @@ -501,7 +501,6 @@ def handle(self): responseStatus, physical_resource_id=self.event.get('PhysicalResourceId', self.context.aws_request_id) ) - def lambda_handler(event, context): ''' @@ -509,7 +508,7 @@ def lambda_handler(event, context): ''' print("Received event:", event) cfn = CFNResponse(event, context) - + integration_type = event['ResourceProperties']['Parameters']['IntegrationType'] dlq_enabled = event['ResourceProperties']['DLQ'].get('EnableDLQ', False) if dlq_enabled == 'false': diff --git a/src/clients.rs b/src/clients.rs index 2afb4f3..e8df45c 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -1,8 +1,11 @@ +use crate::assume_role; +use aws_config::SdkConfig; +use aws_sdk_ecr::config::Credentials as EcrCredentials; +use aws_sdk_ecr::Client as EcrClient; +use aws_sdk_s3::config::Credentials as S3Credentials; use aws_sdk_s3::Client as S3Client; use aws_sdk_sqs::Client as SqsClient; -use aws_sdk_ecr::Client as EcrClient; -use aws_config::SdkConfig; - +use aws_sdk_sts::{Client as StsClient, Error as StsError}; /// A type used to hold the AWS clients required to interact with AWS services /// used by the lambda function. @@ -21,4 +24,63 @@ impl AwsClients { sqs: SqsClient::new(&sdk_config), } } + + // new_assume_role() method to create a new AWS client with the provided role + pub async fn new_assume_role(sdk_config: &SdkConfig, role_arn: &str) -> Result { + let sts_client = StsClient::new(&sdk_config); + let response = sts_client + .assume_role() + .role_arn(role_arn) + .role_session_name("CoralogixAWSShipperSession") + .send() + .await?; + + // Extract temporary credentials + let creds = response + .credentials() + .expect(format!("no credentials found for role_arn: {}", role_arn).as_str()); + + Ok(AwsClients { + s3: assume_role!( + "s3provider", + role_arn, + creds, + S3Credentials, + S3Client, + sdk_config + ), + ecr: assume_role!( + "ecrprovider", + role_ar, + creds, + EcrCredentials, + EcrClient, + sdk_config + ), + + // SQS permissions are only required for managing DLQ. The default client is sufficient for this. + sqs: SqsClient::new(&sdk_config), + }) + } +} + +// macro for creating a new AWS client with the provided role +#[macro_export] +macro_rules! assume_role { + ($provider_name:expr, $role_arn:expr, $creds:expr, $credentials:ident, $client:ty, $sdk_config:expr) => {{ + let credentials = $credentials::new( + $creds.access_key_id(), + $creds.secret_access_key(), + Some($creds.session_token().to_string()), + None, + $provider_name, + ); + + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .credentials_provider(credentials) + .load() + .await; + + <$client>::new(&config) + }}; } diff --git a/src/lib.rs b/src/lib.rs index ac6239e..0764f0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; -pub mod logs; pub mod clients; pub mod events; +pub mod logs; pub fn set_up_logging() { tracing_subscriber::fmt() diff --git a/src/logs/config.rs b/src/logs/config.rs index be9fafa..4eacab1 100644 --- a/src/logs/config.rs +++ b/src/logs/config.rs @@ -27,6 +27,7 @@ pub struct Config { pub dlq_url: Option, pub dlq_retry_limit: Option, pub dlq_s3_bucket: Option, + pub lambda_assume_role: Option, } #[derive(PartialEq, Eq, Debug, Clone, Copy)] @@ -119,6 +120,7 @@ impl Config { dlq_url: env::var("DLQ_URL").ok(), dlq_retry_limit: env::var("DLQ_RETRY_LIMIT").ok(), dlq_s3_bucket: env::var("DLQ_S3_BUCKET").ok(), + lambda_assume_role: env::var("LAMBDA_ASSUME_ROLE").ok(), }; Ok(conf) diff --git a/src/logs/coralogix.rs b/src/logs/coralogix.rs index 193856d..216ff5e 100644 --- a/src/logs/coralogix.rs +++ b/src/logs/coralogix.rs @@ -1,5 +1,6 @@ use crate::logs::config::Config; use crate::logs::process::Metadata; +use crate::logs::*; use cx_sdk_rest_logs::auth::AuthData; use cx_sdk_rest_logs::model::{LogSinglesEntry, LogSinglesRequest, Severity}; use cx_sdk_rest_logs::DynLogExporter; @@ -7,14 +8,13 @@ use futures::stream::{StreamExt, TryStreamExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::collections::HashMap; use std::env; use std::iter::IntoIterator; use std::time::Instant; use std::vec::Vec; use time::OffsetDateTime; -use tracing::{error, info, debug}; -use std::collections::HashMap; -use crate::logs::*; +use tracing::{debug, error, info}; pub async fn process_batches( logs: Vec, diff --git a/src/logs/mod.rs b/src/logs/mod.rs index b845952..1a7c04c 100644 --- a/src/logs/mod.rs +++ b/src/logs/mod.rs @@ -1,27 +1,25 @@ +use crate::clients::AwsClients; +use crate::events; +use crate::logs::config::IntegrationType; use async_recursion::async_recursion; -use cx_sdk_rest_logs::{DynLogExporter, RestLogExporter}; -use lambda_runtime::{Context, Error, LambdaEvent}; use aws_lambda_events::cloudwatch_logs::LogsEvent; use aws_lambda_events::event::cloudwatch_logs::AwsLogs; -use cx_sdk_rest_logs::config::{BackoffConfig, LogExporterConfig}; -use tracing::{debug, info}; -use std::time::Duration; -use std::collections::HashMap; -use crate::clients::AwsClients; -use crate::logs::config::IntegrationType; -use crate::events; -use http::header::USER_AGENT; use aws_lambda_events::event::s3::S3Event; use aws_sdk_s3::Client as S3Client; use aws_sdk_sqs::types::MessageAttributeValue; +use cx_sdk_rest_logs::config::{BackoffConfig, LogExporterConfig}; +use cx_sdk_rest_logs::{DynLogExporter, RestLogExporter}; +use http::header::USER_AGENT; +use lambda_runtime::{Context, Error, LambdaEvent}; +use std::collections::HashMap; use std::sync::Arc; - +use std::time::Duration; +use tracing::{debug, info}; pub mod config; -pub mod process; -pub mod ecr; pub mod coralogix; - +pub mod ecr; +pub mod process; pub fn set_up_coralogix_exporter(config: &config::Config) -> Result { let backoff = BackoffConfig { @@ -82,7 +80,8 @@ pub async fn handler( let s3_event = serde_json::from_str::(message)?; let (bucket, key) = handle_s3_event(s3_event).await?; info!("SNS S3 EVENT Detected"); - crate::logs::process::s3(&clients.s3, coralogix_exporter, config, bucket, key).await?; + crate::logs::process::s3(&clients.s3, coralogix_exporter, config, bucket, key) + .await?; } else { info!("SNS TEXT EVENT Detected"); crate::logs::process::sns_logs( @@ -96,8 +95,7 @@ pub async fn handler( events::Combined::CloudWatchLogs(logs_event) => { info!("CLOUDWATCH EVENT Detected"); let cloudwatch_event_log = handle_cloudwatch_logs_event(logs_event).await?; - process::cloudwatch_logs(cloudwatch_event_log, coralogix_exporter, config) - .await?; + process::cloudwatch_logs(cloudwatch_event_log, coralogix_exporter, config).await?; } events::Combined::Sqs(sqs_event) => { debug!("SQS Event: {:?}", sqs_event.records[0]); @@ -111,13 +109,9 @@ pub async fn handler( // note that there is no risk of hitting the recursion stack limit // here as recursiion will only be called as many times as there are nested // events in an SQS message - let result = handler( - clients, - coralogix_exporter.clone(), - config, - internal_event, - ) - .await; + let result = + handler(clients, coralogix_exporter.clone(), config, internal_event) + .await; if result.is_ok() { continue; @@ -193,12 +187,8 @@ pub async fn handler( result?; } else { debug!("SQS TEXT EVENT Detected"); - process::sqs_logs( - message.clone(), - coralogix_exporter.clone(), - config, - ) - .await?; + process::sqs_logs(message.clone(), coralogix_exporter.clone(), config) + .await?; } } } @@ -263,7 +253,6 @@ pub async fn handle_s3_event(s3_event: S3Event) -> Result<(String, String), Erro Ok((bucket, decoded_key)) } - async fn s3_store_failed_event( s3client: &S3Client, bucket: String, diff --git a/src/logs/process.rs b/src/logs/process.rs index dd419aa..1aae890 100644 --- a/src/logs/process.rs +++ b/src/logs/process.rs @@ -11,6 +11,7 @@ use fancy_regex::Regex; use flate2::read::MultiGzDecoder; use itertools::Itertools; use lambda_runtime::Error; +use std::env; use std::ffi::OsStr; use std::io::Read; use std::ops::Range; @@ -18,7 +19,6 @@ use std::path::Path; use std::string::String; use std::time::Instant; use tracing::{debug, info}; -use std::env; use crate::logs::config::{Config, IntegrationType}; use crate::logs::coralogix; @@ -130,7 +130,6 @@ pub async fn s3( Ok(()) } - pub struct Metadata { pub stream_name: String, pub log_group: String, @@ -179,12 +178,12 @@ pub async fn kinesis_logs( .clone() .unwrap_or_else(|| "NO SUBSYSTEM NAME".to_string()); let v = kinesis_message.0; - + let batches = if config.integration_type == IntegrationType::CloudWatch { tracing::debug!("CloudWatch IntegrationType Detected"); - + let cloudwatch_payload = ungzip(v, String::new())?; - + let string_cw = String::from_utf8(cloudwatch_payload)?; tracing::debug!("CloudWatch Payload {:?}", string_cw); let log_data: LogData = serde_json::from_str(&string_cw)?; @@ -195,7 +194,9 @@ pub async fn kinesis_logs( match ungzip(v.clone(), String::new()) { Ok(un_v) => un_v, Err(_) => { - tracing::error!("Data does not appear to be valid gzip format. Treating as UTF-8"); + tracing::error!( + "Data does not appear to be valid gzip format. Treating as UTF-8" + ); v } } @@ -215,7 +216,6 @@ pub async fn kinesis_logs( } }; - coralogix::process_batches( batches, &defined_app_name, @@ -329,7 +329,12 @@ pub async fn cloudwatch_logs( IntegrationType::CloudWatch => { metadata_instance.stream_name = cloudwatch_event_log.data.log_stream.clone(); metadata_instance.log_group = cloudwatch_event_log.data.log_group.clone(); - process_cloudwatch_logs(cloudwatch_event_log.data, config.sampling, &config.blocking_pattern).await? + process_cloudwatch_logs( + cloudwatch_event_log.data, + config.sampling, + &config.blocking_pattern, + ) + .await? } _ => { tracing::warn!( @@ -426,8 +431,11 @@ pub async fn get_bytes_from_s3( Ok(data) } - -async fn process_cloudwatch_logs(cw_event: LogData, sampling: usize, blocking_pattern: &str) -> Result, Error> { +async fn process_cloudwatch_logs( + cw_event: LogData, + sampling: usize, + blocking_pattern: &str, +) -> Result, Error> { let log_entries: Vec = cw_event .log_events .into_iter() @@ -438,7 +446,10 @@ async fn process_cloudwatch_logs(cw_event: LogData, sampling: usize, blocking_pa //Ok(sample(sampling, log_entries)) let re_block: Regex = Regex::new(blocking_pattern)?; info!("Blocking Pattern: {:?}", blocking_pattern); - Ok(sample(sampling, block(re_block, log_entries, blocking_pattern)?)) + Ok(sample( + sampling, + block(re_block, log_entries, blocking_pattern)?, + )) } async fn process_vpcflows( raw_data: Vec, @@ -505,12 +516,12 @@ async fn process_csv( .copied() .collect_vec() } else { - if custom_header.len() > 0{ + if custom_header.len() > 0 { flow_header = custom_header.split(csv_delimiter).collect_vec(); } else { flow_header = array_s[0].split(csv_delimiter).collect_vec(); } - tracing::debug!("Flow Header: {:?}", &flow_header); + tracing::debug!("Flow Header: {:?}", &flow_header); array_s .iter() .skip(1) @@ -660,7 +671,7 @@ fn ungzip(compressed_data: Vec, _: String) -> Result, Error> { return Ok(Vec::new()); } let mut decoder = MultiGzDecoder::new(&compressed_data[..]); - + let mut output = Vec::new(); let mut chunk = [0; 8192]; loop { @@ -670,11 +681,15 @@ fn ungzip(compressed_data: Vec, _: String) -> Result, Error> { break; } output.extend_from_slice(&chunk[..bytes_read]); - }, + } Err(err) => { - tracing::warn!(?err, "Problem decompressing data after {} bytes", output.len()); + tracing::warn!( + ?err, + "Problem decompressing data after {} bytes", + output.len() + ); return Ok(output); - }, + } } } if output.is_empty() { @@ -684,7 +699,6 @@ fn ungzip(compressed_data: Vec, _: String) -> Result, Error> { Ok(output) } - fn parse_records( flow_header: &[&str], records: &[&str], diff --git a/src/main.rs b/src/main.rs index b2e6483..ff94238 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,10 @@ -pub mod logs; -pub mod metrics; pub mod clients; pub mod events; +pub mod logs; +pub mod metrics; -use aws_config::BehaviorVersion; use crate::events::Combined; +use aws_config::BehaviorVersion; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use tracing::{info, warn}; @@ -48,7 +48,7 @@ async fn main() -> Result<(), Error> { ); let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; - let aws_clients = clients::AwsClients::new(&aws_config); + let mut aws_clients = clients::AwsClients::new(&aws_config); match mode { TelemetryMode::Traces => { @@ -60,11 +60,9 @@ async fn main() -> Result<(), Error> { warn!("metrics telemetry mode not implemented"); // TODO: implement metrics run(service_fn(|request: LambdaEvent| { - metrics::handler( - &aws_clients, - request, - ) - })).await + metrics::handler(&aws_clients, request) + })) + .await } // default to logs telemetry mode @@ -73,19 +71,22 @@ async fn main() -> Result<(), Error> { let mut conf = crate::logs::config::Config::load_from_env()?; let api_key_value = conf.api_key.token().to_string(); if api_key_value.starts_with("arn:aws") && api_key_value.contains(":secretsmanager") { - conf.api_key = crate::logs::config::get_api_key_from_secrets_manager(&aws_config, api_key_value) - .await - .map_err(|e| e.to_string())?; + conf.api_key = crate::logs::config::get_api_key_from_secrets_manager( + &aws_config, + api_key_value, + ) + .await + .map_err(|e| e.to_string())?; }; + // override config if using assume role + if let Some(role_arn) = conf.lambda_assume_role.as_ref() { + aws_clients = clients::AwsClients::new_assume_role(&aws_config, role_arn).await?; + } + let coralogix_exporter = crate::logs::set_up_coralogix_exporter(&conf)?; run(service_fn(|request: LambdaEvent| { - logs::handler( - &aws_clients, - coralogix_exporter.clone(), - &conf, - request, - ) + logs::handler(&aws_clients, coralogix_exporter.clone(), &conf, request) })) .await } diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 7fe5481..5c30330 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -1,17 +1,13 @@ - use crate::clients::AwsClients; +use crate::events; use async_recursion::async_recursion; use lambda_runtime::{Error, LambdaEvent}; -use crate::events; use tracing::warn; #[async_recursion] // metric telemetry handler // TODO: implement -pub async fn handler( - _: &AwsClients, - _: LambdaEvent, -) -> Result<(), Error> { +pub async fn handler(_: &AwsClients, _: LambdaEvent) -> Result<(), Error> { warn!("metrics telemetry mode not implemented"); Ok(()) } diff --git a/template.yaml b/template.yaml index c140792..272a747 100644 --- a/template.yaml +++ b/template.yaml @@ -25,7 +25,7 @@ Metadata: - kinesis - cloudfront HomePageUrl: https://coralogix.com - SemanticVersion: 1.0.14 + SemanticVersion: 1.0.15 SourceCodeUrl: https://github.com/coralogix/coralogix-aws-shipper AWS::CloudFormation::Interface: @@ -93,6 +93,9 @@ Metadata: - LambdaLogRetention - FunctionRunTime - FunctionArchitectures + - LambdaAssumeRoleARN + - ExecutionRoleARN + - ReservedConcurrentExecutions - Label: default: VPC configuration Optional Parameters: @@ -239,6 +242,11 @@ Parameters: - arm64 - x86_64 + LambdaAssumeRoleARN: + Type: String + Description: The ARN of the role that the lambda function will assume + Default: '' + SNSTopicArn: Type: String Description: The ARN for the SNS topic that contains the SNS subscription responsible for retrieving logs from Amazon S3 @@ -403,6 +411,16 @@ Parameters: # placeholder value to avoid cfn-lint issues Default: 'none' + ReservedConcurrentExecutions: + Type: Number + Description: The number of concurrent executions that are reserved for this function + Default: 0 + + ExecutionRoleARN: + Type: String + Description: The arn of a user defined role that will be used as the execution role for the lambda function + Default: '' + Mappings: CoralogixRegionMap: EU1: @@ -432,7 +450,6 @@ Conditions: IsSNSIntegration: !Equals [ !Ref IntegrationType, 'Sns' ] UseECRScan: !Equals [ !Ref IntegrationType, 'EcrScan' ] IsSQSIntegration: !Equals [ !Ref IntegrationType, 'Sqs' ] - IsKinesisIntegration: !Equals [ !Ref IntegrationType, 'Kinesis' ] IsNotificationEnabled: !Not [ !Equals [ !Ref NotificationEmail, '' ] ] IsCustomDomain: !Equals [ !Ref CoralogixRegion, Custom ] S3KeyPrefixIsSet: !Not [ !Equals [ !Ref S3KeyPrefix, '' ] ] @@ -462,6 +479,7 @@ Conditions: UseKinesisStreamARN: !And - !Not [ !Equals [ !Ref KinesisStreamArn, '' ] ] - !Equals [ !Ref CloudWatchLogGroupName, '' ] + - !Equals [ !Ref IntegrationType, 'Kinesis' ] UseVpcConfig: !And - !Not [ !Equals [ !Ref LambdaSubnetID, '' ] ] - !Not [ !Equals [ !Ref LambdaSecurityGroupID, '' ] ] @@ -471,6 +489,9 @@ Conditions: - !Equals [ !Ref IntegrationType, 'MSK' ] S3BucketNameIsSet: !Not [ !Equals [ !Ref S3BucketName, 'none' ] ] + IsLambdaAssumeRoleEnable: !Not [!Equals [!Ref LambdaAssumeRoleARN, '']] + ExecutionRoleARNIsSet: !Not [!Equals [!Ref ExecutionRoleARN, '']] + ReservedConcurrentExecutionsIsSet: !Not [!Equals [!Ref ReservedConcurrentExecutions, 0]] Rules: ValidateDLQ: @@ -559,146 +580,228 @@ Resources: - ":" - !Ref "AWS::StackId" - LambdaFunction: - Type: AWS::Serverless::Function - Metadata: - SamResourceId: LambdaFunction - BuildMethod: makefile + # Define the IAM Role for Lambda + LambdaExecutionRole: + Type: AWS::IAM::Role Properties: - Description: Send logs to Coralogix. - Handler: bootstrap - Runtime: !Ref FunctionRunTime - Architectures: - - !Ref FunctionArchitectures - MemorySize: !Ref FunctionMemorySize - Timeout: !Ref FunctionTimeout - CodeUri: . - Policies: - - !If - - S3BucketNameIsSet - - S3ReadPolicy: - BucketName: !Ref S3BucketName - - !Ref AWS::NoValue + # RoleName: "LambdaExecutionRole" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: "Allow" + Principal: + Service: "lambda.amazonaws.com" + Action: "sts:AssumeRole" + + # Define the IAM Policy + LambdaExecutionPolicy: + Type: AWS::IAM::Policy + Properties: + PolicyName: "LambdaExecutionPolicy" + Roles: !If + - ExecutionRoleARNIsSet + - - !Select [1, !Split ["/", !Select [5, !Split [":", !Ref ExecutionRoleARN]]]] + - - !Ref LambdaExecutionRole + PolicyDocument: + Version: "2012-10-17" + Statement: + + # Create and write to log group for the lambda logs + - Effect: Allow + Action: + - 'logs:CreateLogGroup' + - 'logs:CreateLogStream' + - 'logs:PutLogEvents' + Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*' + + # use privateLink + - !If + - IsPrivateLink + - Effect: Allow + Action: + - 'ec2:CreateNetworkInterface' + - 'ec2:DescribeNetworkInterfaces' + - 'ec2:DescribeVpcs' + - 'ec2:DeleteNetworkInterface' + - 'ec2:DescribeSubnets' + - 'ec2:DescribeSecurityGroups' + Resource: "*" + - !Ref AWS::NoValue - # if SQS topic ARN is set, add policy - - !If - - SQSIsSet - - SQSPollerPolicy: - QueueName: !Select - - 5 - - !Split - - ":" - - !If - - IsSQSIntegration - - !Ref SQSIntegrationTopicArn - - !Ref SQSTopicArn - - !Ref AWS::NoValue + # Assume Role Policy + - !If + - IsLambdaAssumeRoleEnable + - Effect: Allow + Action: + - 'sts:AssumeRole' + Resource: !Ref LambdaAssumeRoleARN + - !Ref AWS::NoValue - # if kafka/msk integration is set, add policy - - !If - - IsKafkaIntegration - - Statement: - - Effect: Allow - Action: - - 'ec2:CreateNetworkInterface' - - 'ec2:DescribeNetworkInterfaces' - - 'ec2:DescribeVpcs' - - 'ec2:DeleteNetworkInterface' - - 'ec2:DescribeSubnets' - - 'ec2:DescribeSecurityGroups' - Resource: '*' - - !Ref AWS::NoValue - - # if kinesis integration is set, add policy - - !If - - IsKinesisIntegration - - KinesisStreamReadPolicy: - StreamName: !Select [1, !Split ["/", !Select [5, !Split [":", !Ref KinesisStreamArn]]]] - - !Ref AWS::NoValue + # add SNS + - Effect: Allow + Action: + - 'sns:Publish' + Resource: !Sub 'arn:aws:sns:${AWS::Region}:${AWS::AccountId}:*' - # if msk integration is set, add policy - - !If - - UseMSK - - Statement: - - Effect: Allow - Action: - - 'kafka:DescribeCluster' - - 'kafka:GetBootstrapBrokers' - - 'kafka:DescribeClusterV2' - - 'ec2:DescribeSecurityGroups' - Resource: !Ref MSKClusterArn - - Effect: Allow - Action: - - 'ec2:CreateNetworkInterface' - - 'ec2:DescribeNetworkInterfaces' - - 'ec2:DescribeVpcs' - - 'ec2:DeleteNetworkInterface' - - 'ec2:DescribeSubnets' - - 'ec2:DescribeSecurityGroups' - Resource: '*' - - !Ref AWS::NoValue - - !If - - ApiKeyIsArn - - Statement: + + # S3 Policy + - !If + - S3BucketNameIsSet + - Effect: Allow + Action: + - 's3:GetObject' + Resource: !Sub "arn:aws:s3:::${S3BucketName}/*" + - !Ref AWS::NoValue + + # SQS Policy + - !If + - SQSIsSet + - Effect: Allow + Action: + - 'sqs:ReceiveMessage' + - 'sqs:DeleteMessage' + - 'sqs:GetQueueAttributes' + Resource: !If + - IsSQSIntegration + - !Ref SQSIntegrationTopicArn + - !Ref SQSTopicArn + - !Ref AWS::NoValue + + # Kafka Integration Policy + - !If + - IsKafkaIntegration + - Effect: Allow + Action: + - 'ec2:CreateNetworkInterface' + - 'ec2:DescribeNetworkInterfaces' + - 'ec2:DescribeVpcs' + - 'ec2:DeleteNetworkInterface' + - 'ec2:DescribeSubnets' + - 'ec2:DescribeSecurityGroups' + Resource: '*' + - !Ref AWS::NoValue + + # Kinesis Integration Policy + - !If + - UseKinesisStreamARN + - Effect: Allow + Action: + - 'kinesis:DescribeStream' + - 'kinesis:GetRecords' + - 'kinesis:GetShardIterator' + - 'kinesis:ListStreams' + - 'kinesis:DescribeStreamSummary' + - 'kinesis:ListShards' + Resource: !Ref KinesisStreamArn + - !Ref AWS::NoValue + + # MSK Integration Policy + - !If + - UseMSK + - Effect: Allow + Action: + - 'kafka:DescribeCluster' + - 'kafka:GetBootstrapBrokers' + - 'kafka:DescribeClusterV2' + Resource: !Ref MSKClusterArn + - !Ref AWS::NoValue + + - !If + - UseMSK + - Effect: Allow + Action: + - 'ec2:CreateNetworkInterface' + - 'ec2:DescribeNetworkInterfaces' + - 'ec2:DescribeVpcs' + - 'ec2:DeleteNetworkInterface' + - 'ec2:DescribeSubnets' + - 'ec2:DescribeSecurityGroups' + Resource: "*" + - !Ref AWS::NoValue + + # Secrets Manager Access + - !If + - ApiKeyIsArn + - Effect: Allow + Action: + - 'secretsmanager:GetSecretValue' + Resource: !Ref ApiKey + + - !If + - StoreAPIKeyInSecretsManager - Effect: Allow Action: - 'secretsmanager:GetSecretValue' - Resource: !Ref ApiKey + Resource: !Ref Secret + - !Ref AWS::NoValue + + # ECR Scan Policy - !If - - StoreAPIKeyInSecretsManager - - Statement: - - Effect: Allow - Action: - - 'secretsmanager:GetSecretValue' - Resource: !Ref Secret - # Note, this is a hack to get around the fact that you can't have a condition on a policy - # If the Apikey is not an ARn or we are not storing the key in secrets manager, then we don't need access - # to secrets manager - - Statement: - - Effect: Deny - Action: - - 'secretsmanager:GetSecretValue' - Resource: '*' - - # if ecrscan is enabled, add policy - - !If - - UseECRScan - - Statement: + - UseECRScan - Effect: Allow Action: - 'ecr:DescribeImageScanFindings' Resource: !Sub 'arn:aws:ecr:${AWS::Region}:${AWS::AccountId}:repository/*' + - !Ref AWS::NoValue + + # DLQ Policy + - !If + - DLQEnabled - Effect: Allow Action: - - 'events:PutEvents' - Resource: "*" - - !Ref AWS::NoValue + - 'sqs:SendMessage' + - 'sqs:GetQueueAttributes' + - 'sqs:GetQueueUrl' + - 'sqs:ReceiveMessage' + - 'sqs:DeleteMessage' + Resource: !GetAtt DeadLetterQueue.Arn + - !Ref AWS::NoValue + - !If + - DLQEnabled + - Effect: Allow + Action: + - 's3:PutObject' + - 's3:PutObjectAcl' + - 's3:AbortMultipartUpload' + - 's3:DeleteObject' + - 's3:PutObjectTagging' + - 's3:PutObjectVersionTagging' + Resource: + - !Sub arn:aws:s3:::${DLQS3Bucket} + - !Sub arn:aws:s3:::${DLQS3Bucket}/* + - !Ref AWS::NoValue + + WaitAfterRoleUpdate: + Type: Custom::Waiter + DependsOn: LambdaExecutionPolicy + Properties: + ServiceToken: !GetAtt CustomResourceWaiter.Arn + Wait: 10 + + LambdaFunction: + Type: AWS::Serverless::Function + DependsOn: WaitAfterRoleUpdate + Metadata: + SamResourceId: LambdaFunction + BuildMethod: makefile + Properties: + Description: Send logs to Coralogix. + Handler: bootstrap + Runtime: !Ref FunctionRunTime + ReservedConcurrentExecutions: !If + - ReservedConcurrentExecutionsIsSet + - !Ref ReservedConcurrentExecutions + - !Ref AWS::NoValue + Architectures: + - !Ref FunctionArchitectures + MemorySize: !Ref FunctionMemorySize + Timeout: !Ref FunctionTimeout + CodeUri: . + Role: !If + - ExecutionRoleARNIsSet + - !Ref ExecutionRoleARN + - !GetAtt LambdaExecutionRole.Arn - # if dlq is enabled, add policy - - !If - - DLQEnabled - - Statement: - - Effect: Allow - Action: - - 'sqs:SendMessage' - - 'sqs:GetQueueAttributes' - - 'sqs:GetQueueUrl' - - 'sqs:ReceiveMessage' - - 'sqs:DeleteMessage' - Resource: !GetAtt DeadLetterQueue.Arn - - Effect: Allow - Action: - - 's3:PutObject' - - 's3:PutObjectAcl' - - 's3:AbortMultipartUpload' - - 's3:DeleteObject' - - 's3:PutObjectTagging' - - 's3:PutObjectVersionTagging' - Resource: - - !Sub arn:aws:s3:::${DLQS3Bucket} - - !Sub arn:aws:s3:::${DLQS3Bucket}/* - - !Ref AWS::NoValue - VpcConfig: !If - UseVpcConfig - SecurityGroupIds: @@ -765,6 +868,7 @@ Resources: - !Ref CSVDelimiter - !Ref AWS::NoValue + ASSUME_ROLE_ARN: !Ref LambdaAssumeRoleARN DLQ_RETRY_LIMIT: !Ref DLQRetryLimit DLQ_ARN: !If - DLQEnabled @@ -779,8 +883,13 @@ Resources: - DLQEnabled - !Ref DLQS3Bucket - !Ref AWS::NoValue + LAMBDA_ASSUME_ROLE: !If + - IsLambdaAssumeRoleEnable + - !Ref LambdaAssumeRoleARN + - !Ref AWS::NoValue SNSNotificationSubscription: + DependsOn: CustomResourceFunction Type: AWS::SNS::Subscription Condition: IsNotificationEnabled Properties: @@ -810,12 +919,15 @@ Resources: - !Ref DeadLetterQueue - !Ref AWS::NoValue Parameters: + ReservedConcurrentExecutions: !Ref ReservedConcurrentExecutions + LambdaAssumeRoleARN: !Ref LambdaAssumeRoleARN CoralogixRegion: !Ref CoralogixRegion CustomDomain: !Ref CustomDomain ApiKey: !If - StoreAPIKeyInSecretsManager - !Ref Secret - !Ref ApiKey + AssumeRoleArn: !Ref LambdaAssumeRoleARN ApplicationName: !Ref ApplicationName SubsystemName: !Ref SubsystemName AddMetadata: !Ref AddMetadata @@ -925,6 +1037,7 @@ Resources: KinesisEventSourceMapping: Condition: UseKinesisStreamARN + DependsOn: CustomResourceFunction Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 100 @@ -939,6 +1052,7 @@ Resources: MSKEventSourceMapping: Condition: UseMSK + DependsOn: CustomResourceFunction Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: !Ref KafkaBatchSize @@ -982,6 +1096,7 @@ Resources: SNSSubscription: Condition: SNSIsSet + DependsOn: CustomResourceFunction Type: AWS::SNS::Subscription Properties: Protocol: lambda @@ -993,6 +1108,7 @@ Resources: SNSEventPermission: Condition: SNSIsSet + DependsOn: CustomResourceFunction Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction @@ -1010,6 +1126,7 @@ Resources: LambdaFunctionEcrInvokePermission: Condition: UseECRScan + DependsOn: CustomResourceFunction Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt LambdaFunction.Arn @@ -1020,6 +1137,7 @@ Resources: ECRScanTrigger: Condition: UseECRScan + DependsOn: CustomResourceFunction Type: AWS::Events::Rule Properties: Description: "Trigger for ECR image scan completion" @@ -1048,6 +1166,12 @@ Resources: Runtime: python3.12 Timeout: 900 Policies: + - Statement: + - Sid: IAMaccess + Effect: Allow + Action: + - 'iam:*' + Resource: '*' - Statement: - Sid: EC2Access Effect: Allow @@ -1090,3 +1214,28 @@ Resources: - logs:DescribeSubscriptionFilters Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*:* CodeUri: custom-resource/ + + CustomResourceWaiter: + Type: AWS::Serverless::Function + Metadata: + SamResourceId: CustomResourceWaiter + Description: | + Custom Resource used to introduce a delay in the stack + creation process for specific components + Properties: + Handler: index.lambda_handler + Runtime: python3.12 + Timeout: 900 + CodeUri: custom-resource/ + InlineCode: | + import json + import time + import cfnresponse + + def lambda_handler(event, context): + print(event) + wait_secs = int(event["ResourceProperties"]["Wait"]) + if event['RequestType'] == 'Create': + time.sleep(wait_secs) + print('waited for {} seconds'.format(wait_secs)) + cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) diff --git a/tests/logs.rs b/tests/logs.rs index a0e31ce..1794832 100644 --- a/tests/logs.rs +++ b/tests/logs.rs @@ -5,9 +5,9 @@ use aws_sdk_ecr::Client as EcrClient; use aws_sdk_s3::Client as S3Client; use aws_sdk_sqs::Client as SqsClient; // use coralogix_aws_shipper::combined_event::Combined; +use coralogix_aws_shipper::clients::AwsClients; use coralogix_aws_shipper::events::Combined; use coralogix_aws_shipper::logs::config::Config; -use coralogix_aws_shipper::clients::AwsClients; use cx_sdk_core::auth::AuthData; use cx_sdk_rest_logs::model::{LogBulkRequest, LogSinglesRequest}; use cx_sdk_rest_logs::LogExporter;