Skip to content

Commit

Permalink
feat: adapted code to new client distinction
Browse files Browse the repository at this point in the history
  • Loading branch information
HugoPBrito committed Oct 15, 2024
1 parent e6ea586 commit 8683cd1
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 39 deletions.
21 changes: 14 additions & 7 deletions prowler/providers/aws/services/waf/waf_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pydantic import BaseModel

from prowler.lib.logger import logger
Expand Down Expand Up @@ -61,7 +63,7 @@ def __init__(self, provider):
self.__threading_call__(self._get_web_acl, self.web_acls.values())

def _list_web_acls(self, regional_client):
logger.info("WAF - Listing Regional Web ACLs...")
logger.info("WAFRegional - Listing Regional Web ACLs...")
try:
for waf in regional_client.list_web_acls()["WebACLs"]:
if not self.audit_resources or (
Expand All @@ -81,7 +83,7 @@ def _list_web_acls(self, regional_client):
)

def _list_resources_for_web_acl(self, regional_client):
logger.info("WAF - Describing resources...")
logger.info("WAFRegional - Describing resources...")
try:
for acl in self.web_acls.values():
if acl.region == regional_client.region:
Expand All @@ -96,21 +98,25 @@ def _list_resources_for_web_acl(self, regional_client):
)

def _get_web_acl(self, acl):
logger.info(f"WAF - Getting Web ACL {acl.name}...")
logger.info(f"WAFRegional - Getting Web ACL {acl.name}...")
try:
get_web_acl = self.regional_clients[acl.region].get_web_acl(WebACLId=acl.id)
for rule in get_web_acl.get("WebACL", {}).get("Rules", []):
rule_id = rule.get("RuleGroupId", "")
rule_id = rule.get("RuleId", "")
if rule.get("Type", "") == "GROUP":
acl.rule_groups.append(Rule(id=rule_id))
else:
acl.rules.append(Rule(id=rule_id))
logger.info(f"Rule: {rule['Name']} - Priority: {rule['Priority']}")
except KeyError:
logger.error(f"Web ACL {acl.name} not found in {acl.region}.")

except Exception as error:
logger.error(

Check warning on line 112 in prowler/providers/aws/services/waf/waf_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/waf/waf_service.py#L111-L112

Added lines #L111 - L112 were not covered by tests
f"{acl.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class Rule(BaseModel):
"""Rule Model for WAF and WAFRegional"""

id: str


Expand All @@ -124,3 +130,4 @@ class WebAcl(BaseModel):
region: str
rules: list[Rule] = []
rule_groups: list[Rule] = []
tags: Optional[list] = []
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.waf.waf_client import waf_client
from prowler.providers.aws.services.waf.wafregional_client import wafregional_client


class waf_webacl_has_rules_or_rule_groups(Check):
def execute(self):
findings = []
for acl in waf_client.web_acls.values():
for acl in wafregional_client.web_acls.values():
report = Check_Report_AWS(self.metadata())
report.region = acl.region
report.resource_id = acl.id
report.resource_arn = acl.arn
# report.resource_tags = acl.tags
report.resource_tags = acl.tags
report.status = "FAIL"
report.status_extended = (
f"AWS WAFv2 Web ACL {acl.id} does not have any rules or rule groups."
)
report.status_extended = f"AWS WAFRegional Web ACL {acl.id} does not have any rules or rule groups."

if acl.rules or acl.rule_groups:
report.status = "PASS"
report.status_extended = (
f"AWS WAFv2 Web ACL {acl.id} has at least one rule or rule group."
)
report.status_extended = f"AWS WAFRegional Web ACL {acl.id} has at least one rule or rule group."

findings.append(report)

Expand Down
33 changes: 29 additions & 4 deletions tests/providers/aws/services/waf/waf_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import botocore

from prowler.providers.aws.services.waf.waf_service import WAF, WAFRegional
from prowler.providers.aws.services.waf.waf_service import WAF, Rule, WAFRegional
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider

# Mocking WAF-Regional Calls
# Mocking WAF Calls
make_api_call = botocore.client.BaseClient._make_api_call


Expand All @@ -23,7 +23,21 @@ def mock_make_api_call(self, operation_name, kwarg):
"alb-arn",
]
}

if operation_name == "GetWebACL":
return {
"WebACL": {
"Rules": [
{
"RuleId": "my-rule-id",
"Type": "REGULAR",
},
{
"RuleId": "my-rule-group-id",
"Type": "GROUP",
},
],
}
}
return make_api_call(self, operation_name, kwarg)


Expand All @@ -43,7 +57,6 @@ def mock_generate_regional_clients(provider, service):
new=mock_generate_regional_clients,
)
class Test_WAF_Service:

# Test WAF Service
def test_service(self):
# WAF client for this test class
Expand Down Expand Up @@ -119,3 +132,15 @@ def test_list_web_acls_waf_regional(self):
assert waf.web_acls[waf_arn].name == "my-web-acl"
assert waf.web_acls[waf_arn].region == AWS_REGION_EU_WEST_1
assert waf.web_acls[waf_arn].id == "my-web-acl-id"

# Test WAFRegional Get Web ACL
def test_get_web_acl(self):
# WAF client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
waf = WAFRegional(aws_provider)
waf_arn = "arn:aws:waf-regional:eu-west-1:123456789012:webacl/my-web-acl-id"
assert waf.web_acls[waf_arn].name == "my-web-acl"
assert waf.web_acls[waf_arn].region == AWS_REGION_EU_WEST_1
assert waf.web_acls[waf_arn].id == "my-web-acl-id"
assert waf.web_acls[waf_arn].rules == [Rule(id="my-rule-id")]
assert waf.web_acls[waf_arn].rule_groups == [Rule(id="my-rule-group-id")]
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def mock_make_api_call_both(self, operation_name, kwarg):
class Test_waf_webacl_has_rules_or_rule_groups:
@mock_aws
def test_no_waf(self):
from prowler.providers.aws.services.waf.waf_service import WAF
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

Expand All @@ -124,8 +124,8 @@ def test_no_waf(self):
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.waf_client",
new=WAF(aws_provider),
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups import (
Expand All @@ -140,7 +140,7 @@ def test_no_waf(self):
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@mock_aws
def test_waf_no_rules_and_no_rule_group(self):
from prowler.providers.aws.services.waf.waf_service import WAF
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

Expand All @@ -149,8 +149,8 @@ def test_waf_no_rules_and_no_rule_group(self):
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.waf_client",
new=WAF(aws_provider),
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups import (
Expand All @@ -164,7 +164,7 @@ def test_waf_no_rules_and_no_rule_group(self):
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"AWS WAFv2 Web ACL {WEB_ACL_ID} does not have any rules or rule groups."
== f"AWS WAFRegional Web ACL {WEB_ACL_ID} does not have any rules or rule groups."
)
assert result[0].resource_id == WEB_ACL_ID
assert (
Expand All @@ -178,7 +178,7 @@ def test_waf_no_rules_and_no_rule_group(self):
)
@mock_aws
def test_waf_rules_and_no_rule_group(self):
from prowler.providers.aws.services.waf.waf_service import WAF
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

Expand All @@ -187,8 +187,8 @@ def test_waf_rules_and_no_rule_group(self):
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.waf_client",
new=WAF(aws_provider),
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups import (
Expand All @@ -202,7 +202,7 @@ def test_waf_rules_and_no_rule_group(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"AWS WAFv2 Web ACL {WEB_ACL_ID} has at least one rule or rule group."
== f"AWS WAFRegional Web ACL {WEB_ACL_ID} has at least one rule or rule group."
)
assert result[0].resource_id == WEB_ACL_ID
assert (
Expand All @@ -217,7 +217,7 @@ def test_waf_rules_and_no_rule_group(self):
)
@mock_aws
def test_waf_no_rules_and_rule_group(self):
from prowler.providers.aws.services.waf.waf_service import WAF
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

Expand All @@ -226,8 +226,8 @@ def test_waf_no_rules_and_rule_group(self):
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.waf_client",
new=WAF(aws_provider),
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups import (
Expand All @@ -241,7 +241,7 @@ def test_waf_no_rules_and_rule_group(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"AWS WAFv2 Web ACL {WEB_ACL_ID} has at least one rule or rule group."
== f"AWS WAFRegional Web ACL {WEB_ACL_ID} has at least one rule or rule group."
)
assert result[0].resource_id == WEB_ACL_ID
assert (
Expand All @@ -253,7 +253,7 @@ def test_waf_no_rules_and_rule_group(self):
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call_both)
@mock_aws
def test_waf_rules_and_rule_group(self):
from prowler.providers.aws.services.waf.waf_service import WAF
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

Expand All @@ -262,8 +262,8 @@ def test_waf_rules_and_rule_group(self):
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.waf_client",
new=WAF(aws_provider),
"prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_webacl_has_rules_or_rule_groups.waf_webacl_has_rules_or_rule_groups import (
Expand All @@ -277,7 +277,7 @@ def test_waf_rules_and_rule_group(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"AWS WAFv2 Web ACL {WEB_ACL_ID} has at least one rule or rule group."
== f"AWS WAFRegional Web ACL {WEB_ACL_ID} has at least one rule or rule group."
)
assert result[0].resource_id == WEB_ACL_ID
assert (
Expand Down

0 comments on commit 8683cd1

Please sign in to comment.