diff --git a/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/__init__.py b/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled.metadata.json b/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled.metadata.json new file mode 100644 index 0000000000..e6b40d35dc --- /dev/null +++ b/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "bedrock_guardrail_prompt_attack_filter_enabled", + "CheckTitle": "Configure Prompt Attack Filter with the highest strength for Amazon Bedrock Guardrails.", + "CheckType": [], + "ServiceName": "bedrock", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:bedrock:region:account-id:guardrails/resource-id", + "Severity": "high", + "ResourceType": "Other", + "Description": "Ensure that prompt attack filter strength is set to HIGH for Amazon Bedrock guardrails to mitigate prompt injection and bypass techniques.", + "Risk": "If prompt attack filter strength is not set to HIGH, Bedrock models may be more vulnerable to prompt injection attacks or jailbreak attempts, which could allow harmful or sensitive content to bypass filters and reach end users.", + "RelatedUrl": "https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails.html", + "Remediation": { + "Code": { + "CLI": "aws bedrock put-guardrails-configuration --guardrails-config 'promptAttackStrength=HIGH'", + "NativeIaC": "", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Bedrock/prompt-attack-strength.html", + "Terraform": "" + }, + "Recommendation": { + "Text": "Set the prompt attack filter strength to HIGH for Amazon Bedrock guardrails to prevent prompt injection attacks and ensure robust protection against content manipulation.", + "Url": "https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-injection.html" + } + }, + "Categories": [ + "security" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "Ensure that prompt attack protection is set to the highest strength to minimize the risk of prompt injection attacks." +} diff --git a/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled.py b/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled.py new file mode 100644 index 0000000000..6f2ba89507 --- /dev/null +++ b/prowler/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled.py @@ -0,0 +1,24 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.bedrock.bedrock_client import bedrock_client + + +class bedrock_guardrail_prompt_attack_filter_enabled(Check): + def execute(self): + findings = [] + for guardrail in bedrock_client.guardrails.values(): + report = Check_Report_AWS(self.metadata()) + report.region = guardrail.region + report.resource_id = guardrail.id + report.resource_arn = guardrail.arn + report.resource_tags = guardrail.tags + report.status = "PASS" + report.status_extended = f"Bedrock Guardrail {guardrail.name} is configured to detect and block prompt attacks with a HIGH strength." + if not guardrail.prompt_attack_filter_strength: + report.status = "FAIL" + report.status_extended = f"Bedrock Guardrail {guardrail.name} is not configured to block prompt attacks." + elif guardrail.prompt_attack_filter_strength != "HIGH": + report.status = "FAIL" + report.status_extended = f"Bedrock Guardrail {guardrail.name} is configured to block prompt attacks but with a filter strength of {guardrail.prompt_attack_filter_strength}, not HIGH." + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/__init__.py b/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled.metadata.json b/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled.metadata.json new file mode 100644 index 0000000000..9def1128fe --- /dev/null +++ b/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "bedrock_guardrail_sensitive_information_filter_enabled", + "CheckTitle": "Configure Sensitive Information Filters for Amazon Bedrock Guardrails.", + "CheckType": [], + "ServiceName": "bedrock", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:bedrock:region:account-id:guardrails/resource-id", + "Severity": "high", + "ResourceType": "Other", + "Description": "Ensure that sensitive information filters are enabled for Amazon Bedrock guardrails to prevent the leakage of sensitive data such as personally identifiable information (PII), financial data, or confidential corporate information.", + "Risk": "If sensitive information filters are not enabled, Bedrock models may inadvertently generate or expose confidential or sensitive information in responses, leading to data breaches, regulatory violations, or reputational damage.", + "RelatedUrl": "https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails.html", + "Remediation": { + "Code": { + "CLI": "aws bedrock put-guardrails-configuration --guardrails-config 'sensitiveInformationFilter=true'", + "NativeIaC": "", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Bedrock/guardrails-with-pii-mask-block.html", + "Terraform": "" + }, + "Recommendation": { + "Text": "Enable sensitive information filters for Amazon Bedrock guardrails to prevent the exposure of sensitive or confidential information.", + "Url": "https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails-sensitive-filters.html" + } + }, + "Categories": [ + "security" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled.py b/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled.py new file mode 100644 index 0000000000..a758358c60 --- /dev/null +++ b/prowler/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled.py @@ -0,0 +1,21 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.bedrock.bedrock_client import bedrock_client + + +class bedrock_guardrail_sensitive_information_filter_enabled(Check): + def execute(self): + findings = [] + for guardrail in bedrock_client.guardrails.values(): + report = Check_Report_AWS(self.metadata()) + report.region = guardrail.region + report.resource_id = guardrail.id + report.resource_arn = guardrail.arn + report.resource_tags = guardrail.tags + report.status = "PASS" + report.status_extended = f"Bedrock Guardrail {guardrail.name} is blocking or masking sensitive information." + if not guardrail.sensitive_information_filter: + report.status = "FAIL" + report.status_extended = f"Bedrock Guardrail {guardrail.name} is not configured to block or mask sensitive information." + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/__init__.py b/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled.metadata.json b/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled.metadata.json new file mode 100644 index 0000000000..7e5c14221b --- /dev/null +++ b/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "bedrock_model_invocation_logs_encryption_enabled", + "CheckTitle": "Ensure that Amazon Bedrock model invocation logs are encrypted with KMS.", + "CheckType": [], + "ServiceName": "bedrock", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:bedrock:region:account-id:model/resource-id", + "Severity": "high", + "ResourceType": "Other", + "Description": "Ensure that Amazon Bedrock model invocation logs are encrypted using AWS KMS to protect sensitive data in the request and response logs for all model invocations.", + "Risk": "If Amazon Bedrock model invocation logs are not encrypted, sensitive data such as prompts, responses, and token usage could be exposed to unauthorized parties. This may lead to data breaches, security vulnerabilities, or unintended use of sensitive information.", + "RelatedUrl": "https://docs.aws.amazon.com/bedrock/latest/userguide/data-protection.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Ensure that model invocation logs for Amazon Bedrock are encrypted using AWS KMS to prevent unauthorized access to sensitive log data.", + "Url": "hhttps://docs.aws.amazon.com/bedrock/latest/userguide/data-protection.html" + } + }, + "Categories": [ + "encryption", + "logging", + "security" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled.py b/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled.py new file mode 100644 index 0000000000..7c66a80ce1 --- /dev/null +++ b/prowler/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled.py @@ -0,0 +1,48 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.bedrock.bedrock_client import bedrock_client +from prowler.providers.aws.services.cloudwatch.logs_client import logs_client +from prowler.providers.aws.services.s3.s3_client import s3_client + + +class bedrock_model_invocation_logs_encryption_enabled(Check): + def execute(self): + findings = [] + for region, logging in bedrock_client.logging_configurations.items(): + if logging.enabled: + s3_encryption = True + cloudwatch_encryption = True + report = Check_Report_AWS(self.metadata()) + report.region = region + report.resource_id = bedrock_client.audited_account + report.resource_arn = bedrock_client.audited_account_arn + report.status = "PASS" + report.status_extended = "Bedrock Model Invocation logs are encrypted." + if logging.s3_bucket: + bucket_arn = ( + f"arn:{s3_client.audited_partition}:s3:::{logging.s3_bucket}" + ) + if ( + bucket_arn in s3_client.buckets + and not s3_client.buckets[bucket_arn].encryption + ): + s3_encryption = False + if logging.cloudwatch_log_group: + log_group_arn = f"arn:{logs_client.audited_partition}:logs:{region}:{logs_client.audited_account}:log-group:{logging.cloudwatch_log_group}" + if ( + log_group_arn in logs_client.log_groups + and not logs_client.log_groups[log_group_arn].kms_id + ): + cloudwatch_encryption = False + if not s3_encryption and not cloudwatch_encryption: + report.status = "FAIL" + report.status_extended = f"Bedrock Model Invocation logs are not encrypted in S3 bucket: {logging.s3_bucket} and CloudWatch Log Group: {logging.cloudwatch_log_group}." + elif not s3_encryption: + report.status = "FAIL" + report.status_extended = f"Bedrock Model Invocation logs are not encrypted in S3 bucket: {logging.s3_bucket}." + elif not cloudwatch_encryption: + report.status = "FAIL" + report.status_extended = f"Bedrock Model Invocation logs are not encrypted in CloudWatch Log Group: {logging.cloudwatch_log_group}." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/bedrock/bedrock_service.py b/prowler/providers/aws/services/bedrock/bedrock_service.py index fd180b12ec..3cf9f9dcfb 100644 --- a/prowler/providers/aws/services/bedrock/bedrock_service.py +++ b/prowler/providers/aws/services/bedrock/bedrock_service.py @@ -11,7 +11,11 @@ def __init__(self, provider): # Call AWSService's __init__ super().__init__(__class__.__name__, provider) self.logging_configurations = {} + self.guardrails = {} self.__threading_call__(self._get_model_invocation_logging_configuration) + self.__threading_call__(self._list_guardrails) + self.__threading_call__(self._get_guardrail, self.guardrails.values()) + self.__threading_call__(self._list_tags_for_resource, self.guardrails.values()) def _get_model_invocation_logging_configuration(self, regional_client): logger.info("Bedrock - Getting Model Invocation Logging Configuration...") @@ -40,8 +44,65 @@ def _get_model_invocation_logging_configuration(self, regional_client): f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _list_guardrails(self, regional_client): + logger.info("Bedrock - Listing Guardrails...") + try: + for guardrail in regional_client.list_guardrails().get("guardrails", []): + self.guardrails[guardrail["arn"]] = Guardrail( + id=guardrail["id"], + name=guardrail["name"], + arn=guardrail["arn"], + region=regional_client.region, + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def _get_guardrail(self, guardrail): + logger.info("Bedrock - Getting Guardrail...") + try: + guardrail_info = self.regional_clients[guardrail.region].get_guardrail( + guardrailIdentifier=guardrail.id + ) + guardrail.sensitive_information_filter = ( + "sensitiveInformationPolicy" in guardrail_info + ) + for filter in guardrail_info.get("contentPolicy", {}).get("filters", []): + if filter.get("type") == "PROMPT_ATTACK": + guardrail.prompt_attack_filter_strength = filter.get( + "inputStrength", "NONE" + ) + except Exception as error: + logger.error( + f"{guardrail.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def _list_tags_for_resource(self, guardrail): + logger.info("Bedrock - Listing Tags for Resource...") + try: + guardrail.tags = ( + self.regional_clients[guardrail.region] + .list_tags_for_resource(resourceARN=guardrail.arn) + .get("tags", []) + ) + except Exception as error: + logger.error( + f"{guardrail.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + class LoggingConfiguration(BaseModel): enabled: bool = False cloudwatch_log_group: Optional[str] = None s3_bucket: Optional[str] = None + + +class Guardrail(BaseModel): + id: str + name: str + arn: str + region: str + tags: Optional[list] = [] + sensitive_information_filter: bool = False + prompt_attack_filter_strength: Optional[str] diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_kms_encryption_enabled/cloudwatch_log_group_kms_encryption_enabled.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_kms_encryption_enabled/cloudwatch_log_group_kms_encryption_enabled.py index 83b8bf30c0..e405a7e5d4 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_kms_encryption_enabled/cloudwatch_log_group_kms_encryption_enabled.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_kms_encryption_enabled/cloudwatch_log_group_kms_encryption_enabled.py @@ -6,7 +6,7 @@ class cloudwatch_log_group_kms_encryption_enabled(Check): def execute(self): findings = [] if logs_client.log_groups: - for log_group in logs_client.log_groups: + for log_group in logs_client.log_groups.values(): report = Check_Report_AWS(self.metadata()) report.region = log_group.region report.resource_id = log_group.name diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_no_secrets_in_logs/cloudwatch_log_group_no_secrets_in_logs.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_no_secrets_in_logs/cloudwatch_log_group_no_secrets_in_logs.py index cb583ad35b..0e350a31f7 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_no_secrets_in_logs/cloudwatch_log_group_no_secrets_in_logs.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_no_secrets_in_logs/cloudwatch_log_group_no_secrets_in_logs.py @@ -15,7 +15,7 @@ def execute(self): secrets_ignore_patterns = logs_client.audit_config.get( "secrets_ignore_patterns", [] ) - for log_group in logs_client.log_groups: + for log_group in logs_client.log_groups.values(): report = Check_Report_AWS(self.metadata()) report.status = "PASS" report.status_extended = ( diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_retention_policy_specific_days_enabled/cloudwatch_log_group_retention_policy_specific_days_enabled.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_retention_policy_specific_days_enabled/cloudwatch_log_group_retention_policy_specific_days_enabled.py index 342770dc8a..b01ba2363c 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_retention_policy_specific_days_enabled/cloudwatch_log_group_retention_policy_specific_days_enabled.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_group_retention_policy_specific_days_enabled/cloudwatch_log_group_retention_policy_specific_days_enabled.py @@ -11,7 +11,7 @@ def execute(self): "log_group_retention_days", 365 ) if logs_client.log_groups: - for log_group in logs_client.log_groups: + for log_group in logs_client.log_groups.values(): report = Check_Report_AWS(self.metadata()) report.region = log_group.region report.resource_id = log_group.name diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_service.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_service.py index d7c9d3fd9e..124b612351 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_service.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_service.py @@ -82,7 +82,7 @@ def __init__(self, provider): # Call AWSService's __init__ super().__init__(__class__.__name__, provider) self.log_group_arn_template = f"arn:{self.audited_partition}:logs:{self.region}:{self.audited_account}:log-group" - self.log_groups = [] + self.log_groups = {} self.__threading_call__(self._describe_log_groups) self.metric_filters = [] self.__threading_call__(self._describe_metric_filters) @@ -95,7 +95,9 @@ def __init__(self, provider): 1000 # The threshold for number of events to return per log group. ) self.__threading_call__(self._get_log_events) - self.__threading_call__(self._list_tags_for_resource, self.log_groups) + self.__threading_call__( + self._list_tags_for_resource, self.log_groups.values() + ) def _describe_metric_filters(self, regional_client): logger.info("CloudWatch Logs - Describing metric filters...") @@ -113,7 +115,7 @@ def _describe_metric_filters(self, regional_client): self.metric_filters = [] log_group = None - for lg in self.log_groups: + for lg in self.log_groups.values(): if lg.name == filter["logGroupName"]: log_group = lg break @@ -162,16 +164,14 @@ def _describe_log_groups(self, regional_client): never_expire = True retention_days = 9999 if self.log_groups is None: - self.log_groups = [] - self.log_groups.append( - LogGroup( - arn=log_group["arn"], - name=log_group["logGroupName"], - retention_days=retention_days, - never_expire=never_expire, - kms_id=kms, - region=regional_client.region, - ) + self.log_groups = {} + self.log_groups[log_group["arn"]] = LogGroup( + arn=log_group["arn"], + name=log_group["logGroupName"], + retention_days=retention_days, + never_expire=never_expire, + kms_id=kms, + region=regional_client.region, ) except ClientError as error: if error.response["Error"]["Code"] == "AccessDeniedException": @@ -192,7 +192,7 @@ def _describe_log_groups(self, regional_client): def _get_log_events(self, regional_client): regional_log_groups = [ log_group - for log_group in self.log_groups + for log_group in self.log_groups.values() if log_group.region == regional_client.region ] total_log_groups = len(regional_log_groups) diff --git a/tests/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled_test.py b/tests/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled_test.py new file mode 100644 index 0000000000..4f5ed8a2a9 --- /dev/null +++ b/tests/providers/aws/services/bedrock/bedrock_guardrail_prompt_attack_filter_enabled/bedrock_guardrail_prompt_attack_filter_enabled_test.py @@ -0,0 +1,236 @@ +from unittest import mock + +import botocore +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + +make_api_call = botocore.client.BaseClient._make_api_call + +GUARDRAIL_ARN = ( + f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:guardrail/test-id" +) + + +def mock_make_api_call_high_filter(self, operation_name, kwarg): + if operation_name == "ListGuardrails": + return { + "guardrails": [ + { + "id": "test-id", + "arn": GUARDRAIL_ARN, + "status": "READY", + "name": "test", + } + ] + } + elif operation_name == "GetGuardrail": + return { + "name": "test", + "guardrailId": "test-id", + "guardrailArn": GUARDRAIL_ARN, + "status": "READY", + "contentPolicy": { + "filters": [ + { + "type": "PROMPT_ATTACK", + "inputStrength": "HIGH", + "outputStrength": "NONE", + }, + ] + }, + "blockedInputMessaging": "Sorry, the model cannot answer this question.", + "blockedOutputsMessaging": "Sorry, the model cannot answer this question.", + } + return make_api_call(self, operation_name, kwarg) + + +def mock_make_api_call_no_filter(self, operation_name, kwarg): + if operation_name == "ListGuardrails": + return { + "guardrails": [ + { + "id": "test-id", + "arn": GUARDRAIL_ARN, + "status": "READY", + "name": "test", + } + ] + } + elif operation_name == "GetGuardrail": + return { + "name": "test", + "guardrailId": "test-id", + "guardrailArn": GUARDRAIL_ARN, + "status": "READY", + "contentPolicy": {"filters": []}, + "blockedInputMessaging": "Sorry, the model cannot answer this question.", + "blockedOutputsMessaging": "Sorry, the model cannot answer this question.", + } + return make_api_call(self, operation_name, kwarg) + + +def mock_make_api_call_low_filter(self, operation_name, kwarg): + if operation_name == "ListGuardrails": + return { + "guardrails": [ + { + "id": "test-id", + "arn": GUARDRAIL_ARN, + "status": "READY", + "name": "test", + } + ] + } + elif operation_name == "GetGuardrail": + return { + "name": "test", + "guardrailId": "test-id", + "guardrailArn": GUARDRAIL_ARN, + "status": "READY", + "contentPolicy": { + "filters": [ + { + "type": "PROMPT_ATTACK", + "inputStrength": "LOW", + "outputStrength": "NONE", + }, + ] + }, + "blockedInputMessaging": "Sorry, the model cannot answer this question.", + "blockedOutputsMessaging": "Sorry, the model cannot answer this question.", + } + return make_api_call(self, operation_name, kwarg) + + +class Test_bedrock_guardrail_prompt_attack_filter_enabled: + @mock_aws + def test_no_guardrails(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_client", + new=Bedrock(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled import ( + bedrock_guardrail_prompt_attack_filter_enabled, + ) + + check = bedrock_guardrail_prompt_attack_filter_enabled() + result = check.execute() + + assert len(result) == 0 + + @mock.patch( + "botocore.client.BaseClient._make_api_call", new=mock_make_api_call_high_filter + ) + @mock_aws + def test_guardrail_high_filter(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_client", + new=Bedrock(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled import ( + bedrock_guardrail_prompt_attack_filter_enabled, + ) + + check = bedrock_guardrail_prompt_attack_filter_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Bedrock Guardrail test is configured to detect and block prompt attacks with a HIGH strength." + ) + assert result[0].resource_id == "test-id" + assert result[0].resource_arn == GUARDRAIL_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock.patch( + "botocore.client.BaseClient._make_api_call", new=mock_make_api_call_no_filter + ) + @mock_aws + def test_guardrail_no_filter(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_client", + new=Bedrock(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled import ( + bedrock_guardrail_prompt_attack_filter_enabled, + ) + + check = bedrock_guardrail_prompt_attack_filter_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Bedrock Guardrail test is not configured to block prompt attacks." + ) + assert result[0].resource_id == "test-id" + assert result[0].resource_arn == GUARDRAIL_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock.patch( + "botocore.client.BaseClient._make_api_call", new=mock_make_api_call_low_filter + ) + @mock_aws + def test_guardrail_low_filter(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_client", + new=Bedrock(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_guardrail_prompt_attack_filter_enabled.bedrock_guardrail_prompt_attack_filter_enabled import ( + bedrock_guardrail_prompt_attack_filter_enabled, + ) + + check = bedrock_guardrail_prompt_attack_filter_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Bedrock Guardrail test is configured to block prompt attacks but with a filter strength of LOW, not HIGH." + ) + assert result[0].resource_id == "test-id" + assert result[0].resource_arn == GUARDRAIL_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] diff --git a/tests/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled_test.py b/tests/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled_test.py new file mode 100644 index 0000000000..b33eb6ffbc --- /dev/null +++ b/tests/providers/aws/services/bedrock/bedrock_guardrail_sensitive_information_filter_enabled/bedrock_guardrail_sensitive_information_filter_enabled_test.py @@ -0,0 +1,161 @@ +from unittest import mock + +import botocore +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + +make_api_call = botocore.client.BaseClient._make_api_call + +GUARDRAIL_ARN = ( + f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:guardrail/test-id" +) + + +def mock_make_api_call_no_filter(self, operation_name, kwarg): + if operation_name == "ListGuardrails": + return { + "guardrails": [ + { + "id": "test-id", + "arn": GUARDRAIL_ARN, + "status": "READY", + "name": "test", + } + ] + } + elif operation_name == "GetGuardrail": + return { + "name": "test", + "guardrailId": "test-id", + "guardrailArn": GUARDRAIL_ARN, + "status": "READY", + "blockedInputMessaging": "Sorry, the model cannot answer this question.", + "blockedOutputsMessaging": "Sorry, the model cannot answer this question.", + } + return make_api_call(self, operation_name, kwarg) + + +def mock_make_api_call_with_filter(self, operation_name, kwarg): + if operation_name == "ListGuardrails": + return { + "guardrails": [ + { + "id": "test-id", + "arn": GUARDRAIL_ARN, + "status": "READY", + "name": "test", + } + ] + } + elif operation_name == "GetGuardrail": + return { + "name": "test", + "guardrailId": "test-id", + "guardrailArn": GUARDRAIL_ARN, + "status": "READY", + "sensitiveInformationPolicy": True, + "contentPolicy": {"filters": []}, + "blockedInputMessaging": "Sorry, the model cannot answer this question.", + "blockedOutputsMessaging": "Sorry, the model cannot answer this question.", + } + return make_api_call(self, operation_name, kwarg) + + +class Test_bedrock_guardrail_sensitive_information_filter_enabled: + @mock_aws + def test_no_guardrails(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_client", + new=Bedrock(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_guardrail_sensitive_information_filter_enabled import ( + bedrock_guardrail_sensitive_information_filter_enabled, + ) + + check = bedrock_guardrail_sensitive_information_filter_enabled() + result = check.execute() + + assert len(result) == 0 + + @mock.patch( + "botocore.client.BaseClient._make_api_call", new=mock_make_api_call_no_filter + ) + @mock_aws + def test_guardrail_no_filter(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_client", + new=Bedrock(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_guardrail_sensitive_information_filter_enabled import ( + bedrock_guardrail_sensitive_information_filter_enabled, + ) + + check = bedrock_guardrail_sensitive_information_filter_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Bedrock Guardrail test is not configured to block or mask sensitive information." + ) + assert result[0].resource_id == "test-id" + assert result[0].resource_arn == GUARDRAIL_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock.patch( + "botocore.client.BaseClient._make_api_call", new=mock_make_api_call_with_filter + ) + @mock_aws + def test_guardrail_with_filter(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_client", + new=Bedrock(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_guardrail_sensitive_information_filter_enabled.bedrock_guardrail_sensitive_information_filter_enabled import ( + bedrock_guardrail_sensitive_information_filter_enabled, + ) + + check = bedrock_guardrail_sensitive_information_filter_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Bedrock Guardrail test is blocking or masking sensitive information." + ) + assert result[0].resource_id == "test-id" + assert result[0].resource_arn == GUARDRAIL_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] diff --git a/tests/providers/aws/services/bedrock/bedrock_model_invocation_logging_enabled/bedrock_model_invocation_logging_enabled_test.py b/tests/providers/aws/services/bedrock/bedrock_model_invocation_logging_enabled/bedrock_model_invocation_logging_enabled_test.py index 464cac1ddb..9d934f69bc 100644 --- a/tests/providers/aws/services/bedrock/bedrock_model_invocation_logging_enabled/bedrock_model_invocation_logging_enabled_test.py +++ b/tests/providers/aws/services/bedrock/bedrock_model_invocation_logging_enabled/bedrock_model_invocation_logging_enabled_test.py @@ -28,7 +28,6 @@ def test_no_loggings(self): "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logging_enabled.bedrock_model_invocation_logging_enabled.bedrock_client", new=Bedrock(aws_provider), ): - # Test Check from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logging_enabled.bedrock_model_invocation_logging_enabled import ( bedrock_model_invocation_logging_enabled, ) diff --git a/tests/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled_test.py b/tests/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled_test.py new file mode 100644 index 0000000000..d215b62b9a --- /dev/null +++ b/tests/providers/aws/services/bedrock/bedrock_model_invocation_logs_encryption_enabled/bedrock_model_invocation_logs_encryption_enabled_test.py @@ -0,0 +1,391 @@ +from unittest import mock + +from boto3 import client +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_ACCOUNT_ARN, + AWS_ACCOUNT_NUMBER, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + + +class Test_bedrock_model_invocation_logs_encryption_enabled: + @mock_aws + def test_no_logging(self): + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + from prowler.providers.aws.services.cloudwatch.cloudwatch_service import Logs + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.bedrock_client", + new=Bedrock(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.logs_client", + new=Logs(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.s3_client", + new=S3(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled import ( + bedrock_model_invocation_logs_encryption_enabled, + ) + + check = bedrock_model_invocation_logs_encryption_enabled() + result = check.execute() + + assert len(result) == 0 + + @mock_aws + def test_s3_and_cloudwatch_logging_not_encrypted(self): + logs_client = client("logs", region_name=AWS_REGION_US_EAST_1) + logs_client.create_log_group(logGroupName="Test") + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + s3_client.create_bucket(Bucket="testconfigbucket") + bedrock = client("bedrock", region_name=AWS_REGION_US_EAST_1) + + logging_config = { + "cloudWatchConfig": { + "logGroupName": "Test", + "roleArn": "testrole", + "largeDataDeliveryS3Config": { + "bucketName": "testbucket", + }, + }, + "s3Config": { + "bucketName": "testconfigbucket", + }, + } + bedrock.put_model_invocation_logging_configuration(loggingConfig=logging_config) + + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + from prowler.providers.aws.services.cloudwatch.cloudwatch_service import Logs + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.bedrock_client", + new=Bedrock(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.logs_client", + new=Logs(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.s3_client", + new=S3(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled import ( + bedrock_model_invocation_logs_encryption_enabled, + ) + + check = bedrock_model_invocation_logs_encryption_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Bedrock Model Invocation logs are not encrypted in S3 bucket: testconfigbucket and CloudWatch Log Group: Test." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock_aws + def test_s3_logging_not_encrypted(self): + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + s3_client.create_bucket(Bucket="testconfigbucket") + bedrock = client("bedrock", region_name=AWS_REGION_US_EAST_1) + + logging_config = { + "s3Config": { + "bucketName": "testconfigbucket", + }, + } + bedrock.put_model_invocation_logging_configuration(loggingConfig=logging_config) + + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + from prowler.providers.aws.services.cloudwatch.cloudwatch_service import Logs + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.bedrock_client", + new=Bedrock(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.logs_client", + new=Logs(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.s3_client", + new=S3(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled import ( + bedrock_model_invocation_logs_encryption_enabled, + ) + + check = bedrock_model_invocation_logs_encryption_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Bedrock Model Invocation logs are not encrypted in S3 bucket: testconfigbucket." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock_aws + def test_cloudwatch_logging_not_encrypted(self): + logs_client = client("logs", region_name=AWS_REGION_US_EAST_1) + logs_client.create_log_group(logGroupName="Test") + bedrock = client("bedrock", region_name=AWS_REGION_US_EAST_1) + + logging_config = { + "cloudWatchConfig": { + "logGroupName": "Test", + "roleArn": "testrole", + }, + } + bedrock.put_model_invocation_logging_configuration(loggingConfig=logging_config) + + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + from prowler.providers.aws.services.cloudwatch.cloudwatch_service import Logs + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.bedrock_client", + new=Bedrock(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.logs_client", + new=Logs(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.s3_client", + new=S3(aws_provider), + ): + + from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled import ( + bedrock_model_invocation_logs_encryption_enabled, + ) + + check = bedrock_model_invocation_logs_encryption_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Bedrock Model Invocation logs are not encrypted in CloudWatch Log Group: Test." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock_aws + def test_s3_and_cloudwatch_logging_encrypted(self): + logs_client = client("logs", region_name=AWS_REGION_US_EAST_1) + logs_client.create_log_group(logGroupName="Test", kmsKeyId="testkey") + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + s3_client.create_bucket(Bucket="testconfigbucket") + sse_config = { + "Rules": [ + { + "ApplyServerSideEncryptionByDefault": { + "SSEAlgorithm": "AES256", + } + } + ] + } + + s3_client.put_bucket_encryption( + Bucket="testconfigbucket", ServerSideEncryptionConfiguration=sse_config + ) + bedrock = client("bedrock", region_name=AWS_REGION_US_EAST_1) + + logging_config = { + "cloudWatchConfig": { + "logGroupName": "Test", + "roleArn": "testrole", + "largeDataDeliveryS3Config": { + "bucketName": "testbucket", + }, + }, + "s3Config": { + "bucketName": "testconfigbucket", + }, + } + bedrock.put_model_invocation_logging_configuration(loggingConfig=logging_config) + + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + from prowler.providers.aws.services.cloudwatch.cloudwatch_service import Logs + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.bedrock_client", + new=Bedrock(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.logs_client", + new=Logs(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.s3_client", + new=S3(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled import ( + bedrock_model_invocation_logs_encryption_enabled, + ) + + check = bedrock_model_invocation_logs_encryption_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Bedrock Model Invocation logs are encrypted." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock_aws + def test_s3_logging_encrypted(self): + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + s3_client.create_bucket(Bucket="testconfigbucket") + sse_config = { + "Rules": [ + { + "ApplyServerSideEncryptionByDefault": { + "SSEAlgorithm": "AES256", + } + } + ] + } + + s3_client.put_bucket_encryption( + Bucket="testconfigbucket", ServerSideEncryptionConfiguration=sse_config + ) + bedrock = client("bedrock", region_name=AWS_REGION_US_EAST_1) + + logging_config = { + "s3Config": { + "bucketName": "testconfigbucket", + }, + } + bedrock.put_model_invocation_logging_configuration(loggingConfig=logging_config) + + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + from prowler.providers.aws.services.cloudwatch.cloudwatch_service import Logs + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.bedrock_client", + new=Bedrock(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.logs_client", + new=Logs(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.s3_client", + new=S3(aws_provider), + ): + from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled import ( + bedrock_model_invocation_logs_encryption_enabled, + ) + + check = bedrock_model_invocation_logs_encryption_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Bedrock Model Invocation logs are encrypted." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] + + @mock_aws + def test_cloudwatch_logging_encrypted(self): + logs_client = client("logs", region_name=AWS_REGION_US_EAST_1) + logs_client.create_log_group(logGroupName="Test", kmsKeyId="testkey") + bedrock = client("bedrock", region_name=AWS_REGION_US_EAST_1) + + logging_config = { + "cloudWatchConfig": { + "logGroupName": "Test", + "roleArn": "testrole", + }, + } + bedrock.put_model_invocation_logging_configuration(loggingConfig=logging_config) + + from prowler.providers.aws.services.bedrock.bedrock_service import Bedrock + from prowler.providers.aws.services.cloudwatch.cloudwatch_service import Logs + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.bedrock_client", + new=Bedrock(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.logs_client", + new=Logs(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled.s3_client", + new=S3(aws_provider), + ): + + from prowler.providers.aws.services.bedrock.bedrock_model_invocation_logs_encryption_enabled.bedrock_model_invocation_logs_encryption_enabled import ( + bedrock_model_invocation_logs_encryption_enabled, + ) + + check = bedrock_model_invocation_logs_encryption_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Bedrock Model Invocation logs are encrypted." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION_US_EAST_1 + assert result[0].resource_tags == [] diff --git a/tests/providers/aws/services/bedrock/bedrock_service_test.py b/tests/providers/aws/services/bedrock/bedrock_service_test.py index 96caf3d859..fceccd9e7a 100644 --- a/tests/providers/aws/services/bedrock/bedrock_service_test.py +++ b/tests/providers/aws/services/bedrock/bedrock_service_test.py @@ -1,3 +1,6 @@ +from unittest import mock + +import botocore from boto3 import client from moto import mock_aws @@ -9,6 +12,52 @@ set_mocked_aws_provider, ) +make_api_call = botocore.client.BaseClient._make_api_call + +GUARDRAIL_ARN = ( + f"arn:aws:bedrock:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:guardrail/test-id" +) + + +def mock_make_api_call(self, operation_name, kwarg): + if operation_name == "ListGuardrails": + return { + "guardrails": [ + { + "id": "test-id", + "arn": GUARDRAIL_ARN, + "status": "READY", + "name": "test", + } + ] + } + elif operation_name == "GetGuardrail": + return { + "name": "test", + "guardrailId": "test-id", + "guardrailArn": GUARDRAIL_ARN, + "status": "READY", + "contentPolicy": { + "filters": [ + { + "type": "PROMPT_ATTACK", + "inputStrength": "HIGH", + "outputStrength": "NONE", + }, + ] + }, + "sensitiveInformationPolicy": True, + "blockedInputMessaging": "Sorry, the model cannot answer this question.", + "blockedOutputsMessaging": "Sorry, the model cannot answer this question.", + } + elif operation_name == "ListTagsForResource": + return { + "tags": [ + {"Key": "Name", "Value": "test"}, + ] + } + return make_api_call(self, operation_name, kwarg) + class Test_Bedrock_Service: @mock_aws @@ -77,3 +126,31 @@ def test_get_model_invocation_logging_configuration(self): == "testconfigbucket" ) assert not bedrock.logging_configurations[AWS_REGION_US_EAST_1].enabled + + @mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + @mock_aws + def test_list_guardrails(self): + aws_provider = set_mocked_aws_provider(audited_regions=[AWS_REGION_US_EAST_1]) + bedrock = Bedrock(aws_provider) + assert len(bedrock.guardrails) == 1 + assert GUARDRAIL_ARN in bedrock.guardrails + assert bedrock.guardrails[GUARDRAIL_ARN].id == "test-id" + assert bedrock.guardrails[GUARDRAIL_ARN].name == "test" + assert bedrock.guardrails[GUARDRAIL_ARN].region == AWS_REGION_US_EAST_1 + + @mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + @mock_aws + def test_get_guardrail(self): + aws_provider = set_mocked_aws_provider(audited_regions=[AWS_REGION_US_EAST_1]) + bedrock = Bedrock(aws_provider) + assert bedrock.guardrails[GUARDRAIL_ARN].sensitive_information_filter + assert bedrock.guardrails[GUARDRAIL_ARN].prompt_attack_filter_strength == "HIGH" + + @mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + @mock_aws + def test_list_tags_for_resource(self): + aws_provider = set_mocked_aws_provider(audited_regions=[AWS_REGION_US_EAST_1]) + bedrock = Bedrock(aws_provider) + assert bedrock.guardrails[GUARDRAIL_ARN].tags == [ + {"Key": "Name", "Value": "test"} + ] diff --git a/tests/providers/aws/services/cloudwatch/cloudwatch_service_test.py b/tests/providers/aws/services/cloudwatch/cloudwatch_service_test.py index e175dfd65e..d857102f43 100644 --- a/tests/providers/aws/services/cloudwatch/cloudwatch_service_test.py +++ b/tests/providers/aws/services/cloudwatch/cloudwatch_service_test.py @@ -178,18 +178,16 @@ def test_describe_log_groups(self): aws_provider = set_mocked_aws_provider( expected_checks=["cloudwatch_log_group_no_secrets_in_logs"] ) + arn = f"arn:aws:logs:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:log-group:/log-group/test" logs = Logs(aws_provider) assert len(logs.log_groups) == 1 - assert ( - logs.log_groups[0].arn - == f"arn:aws:logs:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:log-group:/log-group/test" - ) - assert logs.log_groups[0].name == "/log-group/test" - assert logs.log_groups[0].retention_days == 400 - assert logs.log_groups[0].kms_id == "test_kms_id" - assert not logs.log_groups[0].never_expire - assert logs.log_groups[0].region == AWS_REGION_US_EAST_1 - assert logs.log_groups[0].tags == [ + assert arn in logs.log_groups + assert logs.log_groups[arn].name == "/log-group/test" + assert logs.log_groups[arn].retention_days == 400 + assert logs.log_groups[arn].kms_id == "test_kms_id" + assert not logs.log_groups[arn].never_expire + assert logs.log_groups[arn].region == AWS_REGION_US_EAST_1 + assert logs.log_groups[arn].tags == [ {"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"} ] @@ -206,18 +204,16 @@ def test_describe_log_groupsnever_expire(self): aws_provider = set_mocked_aws_provider( expected_checks=["cloudwatch_log_group_no_secrets_in_logs"] ) + arn = f"arn:aws:logs:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:log-group:/log-group/test" logs = Logs(aws_provider) assert len(logs.log_groups) == 1 - assert ( - logs.log_groups[0].arn - == f"arn:aws:logs:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:log-group:/log-group/test" - ) - assert logs.log_groups[0].name == "/log-group/test" - assert logs.log_groups[0].never_expire + assert arn in logs.log_groups + assert logs.log_groups[arn].name == "/log-group/test" + assert logs.log_groups[arn].never_expire # Since it never expires we don't use the retention_days - assert logs.log_groups[0].retention_days == 9999 - assert logs.log_groups[0].kms_id == "test_kms_id" - assert logs.log_groups[0].region == AWS_REGION_US_EAST_1 - assert logs.log_groups[0].tags == [ + assert logs.log_groups[arn].retention_days == 9999 + assert logs.log_groups[arn].kms_id == "test_kms_id" + assert logs.log_groups[arn].region == AWS_REGION_US_EAST_1 + assert logs.log_groups[arn].tags == [ {"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"} ]