Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(waf): add new check waf_regional_rule_with_conditions #5411

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "waf_rule_has_conditions",
sergargar marked this conversation as resolved.
Show resolved Hide resolved
"CheckTitle": "AWS WAF Classic Regional Rules Should Have at Least One Condition.",
"CheckType": [
"Software and Configuration Checks/Industry and Regulatory Standards/NIST 800-53 Controls"
],
"ServiceName": "waf",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:waf-regional:region:account-id:rule/rule-id",
"Severity": "medium",
"ResourceType": "AwsWafRegionalRule",
"Description": "Ensure that every AWS WAF Classic Regional Rule contains at least one condition.",
"Risk": "An AWS WAF Classic Regional rule without any conditions cannot inspect or filter traffic, potentially allowing malicious requests to pass unchecked.",
"RelatedUrl": "https://docs.aws.amazon.com/config/latest/developerguide/waf-regional-rule-not-empty.html",
"Remediation": {
"Code": {
"CLI": "aws waf-regional update-rule --rule-id <rule-id> --change-token <change-token> --updates <updates>",
sergargar marked this conversation as resolved.
Show resolved Hide resolved
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/waf-controls.html#waf-2",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that every AWS WAF Classic Regional rule has at least one condition to properly inspect and manage web traffic.",
"Url": "https://docs.aws.amazon.com/waf/latest/developerguide/classic-web-acl-rules-editing.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.waf.wafregional_client import wafregional_client


class waf_rule_has_conditions(Check):
def execute(self):
findings = []
for rule in wafregional_client.rules.values():
report = Check_Report_AWS(self.metadata())
report.region = rule.region
report.resource_id = rule.id
report.resource_arn = rule.arn
report.resource_tags = rule.tags
report.status = "FAIL"
report.status_extended = f"AWS WAFRegional Classic Regional Rule {rule.id} does not have any conditions."
sergargar marked this conversation as resolved.
Show resolved Hide resolved

if rule.predicates:
report.status = "PASS"
report.status_extended = f"AWS WAFRegional Classic Regional Rule {rule.id} has at least one condition."
sergargar marked this conversation as resolved.
Show resolved Hide resolved

findings.append(report)

return findings
79 changes: 77 additions & 2 deletions prowler/providers/aws/services/waf/waf_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from pydantic import BaseModel

from prowler.lib.logger import logger
Expand Down Expand Up @@ -56,11 +58,15 @@ def __init__(self, provider):
# Call AWSService's __init__
super().__init__("waf-regional", provider)
self.web_acls = {}
self.rules = {}
self.__threading_call__(self._list_web_acls)
self.__threading_call__(self._list_resources_for_web_acl)
self.__threading_call__(self._get_web_acl, self.web_acls.values())
self.__threading_call__(self._list_rules)
self.__threading_call__(self._get_rule, self.rules.values())

def _list_web_acls(self, regional_client):
logger.info("WAF - Listing Regional Web ACLs...")
logger.info("WAFRegional - Listing Regional Web ACLs...")
try:
for waf in regional_client.list_web_acls()["WebACLs"]:
if not self.audit_resources or (
Expand All @@ -80,7 +86,7 @@ def _list_web_acls(self, regional_client):
)

def _list_resources_for_web_acl(self, regional_client):
logger.info("WAF - Describing resources...")
logger.info("WAFRegional - Describing resources...")
try:
for acl in self.web_acls.values():
if acl.region == regional_client.region:
Expand All @@ -94,6 +100,72 @@ def _list_resources_for_web_acl(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _get_web_acl(self, acl):
logger.info(f"WAFRegional - Getting Web ACL {acl.name}...")
try:
get_web_acl = self.regional_clients[acl.region].get_web_acl(WebACLId=acl.id)
for rule in get_web_acl.get("WebACL", {}).get("Rules", []):
rule_id = rule.get("RuleId", "")
if rule.get("Type", "") == "GROUP":
acl.rule_groups.append(ACLRule(id=rule_id))
else:
acl.rules.append(ACLRule(id=rule_id))

except Exception as error:
logger.error(
f"{acl.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _list_rules(self, regional_client):
logger.info("WAFRegional - Listing Regional Rules...")
try:
for rule in regional_client.list_rules().get("Rules", []):
arn = f"arn:aws:waf-regional:{regional_client.region}:{self.audited_account}:rule/{rule['RuleId']}"
self.rules[arn] = Rule(
arn=arn,
id=rule.get("RuleId", ""),
region=regional_client.region,
name=rule.get("Name", ""),
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _get_rule(self, rule):
logger.info(f"WAFRegional - Getting Rule {rule.name}...")
try:
get_rule = self.regional_clients[rule.region].get_rule(RuleId=rule.id)
for predicate in get_rule.get("Rule", {}).get("Predicates", []):
rule.predicates.append(
Predicate(
HugoPBrito marked this conversation as resolved.
Show resolved Hide resolved
negated=predicate.get("Negated", False),
data_id=predicate.get("DataId", ""),
)
)
except KeyError:
logger.error(f"Rule {rule.name} not found in {rule.region}.")


class Predicate(BaseModel):
negated: bool
data_id: str


class ACLRule(BaseModel):
id: str

sergargar marked this conversation as resolved.
Show resolved Hide resolved

class Rule(BaseModel):
"""Rule Model for WAF and WAFRegional"""

arn: str
id: str
region: str
name: str
predicates: list[Predicate] = []
tags: Optional[list] = []


class WebAcl(BaseModel):
"""Web ACL Model for WAF and WAFRegional"""
Expand All @@ -103,3 +175,6 @@ class WebAcl(BaseModel):
id: str
albs: list[str]
region: str
rules: list[Rule] = []
rule_groups: list[Rule] = []
tags: Optional[list] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from unittest import mock
from unittest.mock import patch

import botocore
from moto import mock_aws

from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)

RULE_ID = "test-rule-id"

# Original botocore _make_api_call function
orig = botocore.client.BaseClient._make_api_call


# Mocked botocore _make_api_call function
def mock_make_api_call_compliant_rule(self, operation_name, kwarg):
if operation_name == "ListRules":
return {
"Rules": [
{
"RuleId": RULE_ID,
"Name": RULE_ID,
},
]
}
if operation_name == "GetRule":
return {
"Rule": {
"RuleId": RULE_ID,
"Predicates": [
{
"Negated": False,
"Type": "IPMatch",
"DataId": "IPSetId",
},
],
}
}
return orig(self, operation_name, kwarg)


def mock_make_api_call_non_compliant_rule(self, operation_name, kwarg):
if operation_name == "ListRules":
return {
"Rules": [
{
"RuleId": RULE_ID,
"Name": RULE_ID,
},
]
}
if operation_name == "GetRule":
return {
"Rule": {
"RuleId": RULE_ID,
"Predicates": [],
}
}
return orig(self, operation_name, kwarg)


class Test_waf_rule_has_conditions:
@mock_aws
def test_no_rules(self):
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_rule_has_conditions.waf_rule_has_conditions.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_rule_has_conditions.waf_rule_has_conditions import (
waf_rule_has_conditions,
)

check = waf_rule_has_conditions()
result = check.execute()

assert len(result) == 0

@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_compliant_rule,
)
@mock_aws
def test_waf_rules_with_condition(self):
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_rule_has_conditions.waf_rule_has_conditions.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_rule_has_conditions.waf_rule_has_conditions import (
waf_rule_has_conditions,
)

check = waf_rule_has_conditions()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"AWS WAFRegional Classic Regional Rule {RULE_ID} has at least one condition."
)
assert result[0].resource_id == RULE_ID
assert (
result[0].resource_arn
== f"arn:aws:waf-regional:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:rule/{RULE_ID}"
)
assert result[0].region == AWS_REGION_US_EAST_1

@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_non_compliant_rule,
)
@mock_aws
def test_waf_rules_without_condition(self):
from prowler.providers.aws.services.waf.waf_service import WAFRegional

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.waf.waf_rule_has_conditions.waf_rule_has_conditions.wafregional_client",
new=WAFRegional(aws_provider),
):
# Test Check
from prowler.providers.aws.services.waf.waf_rule_has_conditions.waf_rule_has_conditions import (
waf_rule_has_conditions,
)

check = waf_rule_has_conditions()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"AWS WAFRegional Classic Regional Rule {RULE_ID} does not have any conditions."
)
assert result[0].resource_id == RULE_ID
assert (
result[0].resource_arn
== f"arn:aws:waf-regional:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:rule/{RULE_ID}"
)
assert result[0].region == AWS_REGION_US_EAST_1
Loading
Loading