Skip to content

Commit

Permalink
chore(aws): add AWS Well-Architected output class (#4439)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergargar authored Jul 12, 2024
1 parent a505776 commit 027aa97
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 111 deletions.
16 changes: 16 additions & 0 deletions prowler/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
from prowler.lib.cli.parser import ProwlerArgumentParser
from prowler.lib.logger import logger, set_logging_config
from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.aws_well_architected.aws_well_architected import (
AWSWellArchitected,
)
from prowler.lib.outputs.compliance.cis.cis_aws import AWSCIS
from prowler.lib.outputs.compliance.cis.cis_azure import AzureCIS
from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
Expand Down Expand Up @@ -405,6 +408,19 @@ def prowler():
file_path=filename,
)
ens_finding.batch_write_data_to_file()
elif compliance_name.startswith("aws_well_architected_framework"):
# Generate AWS Well-Architected Finding Object
filename = (
f"{global_provider.output_options.output_directory}/compliance/"
f"{global_provider.output_options.output_filename}_{compliance_name}.csv"
)
aws_well_architected_finding = AWSWellArchitected(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)
aws_well_architected_finding.batch_write_data_to_file()

elif provider == "azure":
for compliance_name in input_compliance_frameworks:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from csv import DictWriter
from venv import logger

from prowler.lib.check.compliance_models import ComplianceBaseModel
from prowler.lib.outputs.compliance.aws_well_architected.models import (
AWSWellArchitected as AWSWellArchitectedModel,
)
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.finding import Finding


class AWSWellArchitected(ComplianceOutput):
"""
This class represents the AWS Well-Architected compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into AWS Well-Architected compliance format.
- batch_write_data_to_file: Writes the findings data to a CSV file in AWS Well-Architected compliance format.
"""

def transform(
self,
findings: list[Finding],
compliance: ComplianceBaseModel,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS Well-Architected compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (ComplianceBaseModel): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSWellArchitectedModel(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
Region=finding.region,
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Name=attribute.Name,
Requirements_Attributes_WellArchitectedQuestionId=attribute.WellArchitectedQuestionId,
Requirements_Attributes_WellArchitectedPracticeId=attribute.WellArchitectedPracticeId,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_SubSection=attribute.SubSection,
Requirements_Attributes_LevelOfRisk=attribute.LevelOfRisk,
Requirements_Attributes_AssessmentMethod=attribute.AssessmentMethod,
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_ImplementationGuidanceUrl=attribute.ImplementationGuidanceUrl,
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
)
self._data.append(compliance_row)

def batch_write_data_to_file(self) -> None:
"""
Writes the findings data to a CSV file in AWS Well-Architected compliance format.
Returns:
- None
"""
try:
if (
getattr(self, "_file_descriptor", None)
and not self._file_descriptor.closed
and self._data
):
csv_writer = DictWriter(
self._file_descriptor,
fieldnames=[field.upper() for field in self._data[0].dict().keys()],
delimiter=";",
)
csv_writer.writeheader()
for finding in self._data:
csv_writer.writerow(
{k.upper(): v for k, v in finding.dict().items()}
)
self._file_descriptor.close()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
32 changes: 32 additions & 0 deletions prowler/lib/outputs/compliance/aws_well_architected/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Optional

from pydantic import BaseModel


class AWSWellArchitected(BaseModel):
"""
AWSWellArchitected generates a finding's output in AWS Well-Architected Framework format.
"""

Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_Name: str
Requirements_Attributes_WellArchitectedQuestionId: str
Requirements_Attributes_WellArchitectedPracticeId: str
Requirements_Attributes_Section: str
Requirements_Attributes_SubSection: Optional[str]
Requirements_Attributes_LevelOfRisk: str
Requirements_Attributes_AssessmentMethod: str
Requirements_Attributes_Description: str
Requirements_Attributes_ImplementationGuidanceUrl: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool
ResourceName: str
60 changes: 0 additions & 60 deletions prowler/lib/outputs/compliance/aws_well_architected_framework.py

This file was deleted.

14 changes: 1 addition & 13 deletions prowler/lib/outputs/compliance/compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance.aws_well_architected_framework import (
write_compliance_row_aws_well_architected_framework,
)
from prowler.lib.outputs.compliance.cis.cis import get_cis_table
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
from prowler.lib.outputs.compliance.generic import (
Expand Down Expand Up @@ -100,17 +97,8 @@ def fill_compliance(
continue
elif compliance.Framework == "ENS":
continue
elif compliance.Framework == "MITRE-ATTACK" and compliance.Version == "":
elif "AWS-Well-Architected-Framework" in compliance.Framework:
continue

elif (
"AWS-Well-Architected-Framework" in compliance.Framework
and compliance.Provider == "AWS"
):
write_compliance_row_aws_well_architected_framework(
file_descriptors, finding, compliance, output_options, provider
)

elif (
compliance.Framework == "ISO27001"
and compliance.Version == "2013"
Expand Down
Empty file.
26 changes: 0 additions & 26 deletions prowler/lib/outputs/compliance/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,6 @@ class Check_Output_CSV_Generic_Compliance(BaseModel):
Muted: bool


class Check_Output_CSV_AWS_Well_Architected(BaseModel):
"""
Check_Output_CSV_AWS_Well_Architected generates a finding's output in CSV AWS Well Architected Compliance format.
"""

Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Attributes_Name: str
Requirements_Attributes_WellArchitectedQuestionId: str
Requirements_Attributes_WellArchitectedPracticeId: str
Requirements_Attributes_Section: str
Requirements_Attributes_SubSection: Optional[str]
Requirements_Attributes_LevelOfRisk: str
Requirements_Attributes_AssessmentMethod: str
Requirements_Attributes_Description: str
Requirements_Attributes_ImplementationGuidanceUrl: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool


class Check_Output_CSV_AWS_ISO27001_2013(BaseModel):
"""
Check_Output_CSV_AWS_ISO27001_2013 generates a finding's output in CSV AWS ISO27001 Compliance format.
Expand Down
12 changes: 3 additions & 9 deletions prowler/lib/outputs/file_descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance.models import (
Check_Output_CSV_AWS_ISO27001_2013,
Check_Output_CSV_AWS_Well_Architected,
Check_Output_CSV_Generic_Compliance,
)
from prowler.lib.outputs.csv.csv import generate_csv_fields
Expand Down Expand Up @@ -64,6 +63,8 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, provi
continue
elif "ens_" in output_mode:
continue
elif "aws_well_architected_framework" in output_mode:
continue

elif provider.type == "gcp":
filename = f"{output_directory}/compliance/{output_filename}_{output_mode}{csv_file_suffix}"
Expand Down Expand Up @@ -92,14 +93,7 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, provi
elif provider.type == "aws":
# Compliance frameworks
filename = f"{output_directory}/compliance/{output_filename}_{output_mode}{csv_file_suffix}"
if "aws_well_architected_framework" in output_mode:
file_descriptor = initialize_file_descriptor(
filename,
Check_Output_CSV_AWS_Well_Architected,
)
file_descriptors.update({output_mode: file_descriptor})

elif output_mode == "iso27001_2013_aws":
if output_mode == "iso27001_2013_aws":
file_descriptor = initialize_file_descriptor(
filename,
Check_Output_CSV_AWS_ISO27001_2013,
Expand Down
Loading

0 comments on commit 027aa97

Please sign in to comment.