Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(cis): add CIS output class #4400

72 changes: 72 additions & 0 deletions prowler/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
from prowler.lib.cli.parser import ProwlerArgumentParser
from prowler.lib.logger import logger, set_logging_config
from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.cis.cis_aws import AWSCIS
from prowler.lib.outputs.compliance.cis.cis_azure import AzureCIS
from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
from prowler.lib.outputs.compliance.cis.cis_kubernetes import KubernetesCIS

Check warning on line 49 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L46-L49

Added lines #L46 - L49 were not covered by tests
from prowler.lib.outputs.compliance.compliance import display_compliance_table
from prowler.lib.outputs.csv.models import CSV
from prowler.lib.outputs.finding import Finding
Expand Down Expand Up @@ -355,6 +359,74 @@
bucket_session,
)

# Compliance Frameworks
input_compliance_frameworks = set(

Check warning on line 363 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L362-L363

Added lines #L362 - L363 were not covered by tests
global_provider.output_options.output_modes
).intersection(get_available_compliance_frameworks(provider))

Check warning on line 365 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L365

Added line #L365 was not covered by tests
if provider == "aws":
for compliance_name in input_compliance_frameworks:
if compliance_name.startswith("cis_"):
# Generate CIS Finding Object

Check warning on line 369 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L369

Added line #L369 was not covered by tests
filename = (
f"{global_provider.output_options.output_directory}/compliance/"
f"{global_provider.output_options.output_filename}_{compliance_name}.csv"
)
cis_finding = AWSCIS(
findings=finding_outputs,

Check warning on line 375 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L375

Added line #L375 was not covered by tests
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)

Check warning on line 379 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L377-L379

Added lines #L377 - L379 were not covered by tests
cis_finding.batch_write_data_to_file()

Check warning on line 381 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L381

Added line #L381 was not covered by tests
elif provider == "azure":
for compliance_name in input_compliance_frameworks:
if compliance_name.startswith("cis_"):
# Generate CIS Finding Object

Check warning on line 385 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L385

Added line #L385 was not covered by tests
filename = (
f"{global_provider.output_options.output_directory}/compliance/"
f"{global_provider.output_options.output_filename}_{compliance_name}.csv"
)
cis_finding = AzureCIS(
findings=finding_outputs,

Check warning on line 391 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L391

Added line #L391 was not covered by tests
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)

Check warning on line 395 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L393-L395

Added lines #L393 - L395 were not covered by tests
cis_finding.batch_write_data_to_file()

Check warning on line 397 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L397

Added line #L397 was not covered by tests
elif provider == "gcp":
for compliance_name in input_compliance_frameworks:
if compliance_name.startswith("cis_"):
# Generate CIS Finding Object

Check warning on line 401 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L401

Added line #L401 was not covered by tests
filename = (
f"{global_provider.output_options.output_directory}/compliance/"
f"{global_provider.output_options.output_filename}_{compliance_name}.csv"
)
cis_finding = GCPCIS(
findings=finding_outputs,

Check warning on line 407 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L407

Added line #L407 was not covered by tests
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)

Check warning on line 411 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L409-L411

Added lines #L409 - L411 were not covered by tests
cis_finding.batch_write_data_to_file()

Check warning on line 413 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L413

Added line #L413 was not covered by tests
elif provider == "kubernetes":
for compliance_name in input_compliance_frameworks:
if compliance_name.startswith("cis_"):
# Generate CIS Finding Object

Check warning on line 417 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L417

Added line #L417 was not covered by tests
filename = (
f"{global_provider.output_options.output_directory}/compliance/"
f"{global_provider.output_options.output_filename}_{compliance_name}.csv"
)
cis_finding = KubernetesCIS(
findings=finding_outputs,

Check warning on line 423 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L423

Added line #L423 was not covered by tests
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)

Check warning on line 427 in prowler/__main__.py

View check run for this annotation

Codecov / codecov/patch

prowler/__main__.py#L426-L427

Added lines #L426 - L427 were not covered by tests
cis_finding.batch_write_data_to_file()

# AWS Security Hub Integration
if provider == "aws" and args.security_hub:
print(
Expand Down
6 changes: 3 additions & 3 deletions prowler/lib/check/compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from pydantic import parse_obj_as

from prowler.lib.check.compliance_models import Compliance_Base_Model
from prowler.lib.check.compliance_models import ComplianceBaseModel
from prowler.lib.check.models import Check_Metadata_Model
from prowler.lib.logger import logger

Expand All @@ -22,7 +22,7 @@ def update_checks_metadata_with_compliance(
# Include the requirement into the check's framework requirements
compliance_requirements.append(requirement)
# Create the Compliance_Model
compliance = Compliance_Base_Model(
compliance = ComplianceBaseModel(
Framework=framework.Framework,
Provider=framework.Provider,
Version=framework.Version,
Expand All @@ -43,7 +43,7 @@ def update_checks_metadata_with_compliance(
if not requirement.Checks:
compliance_requirements.append(requirement)
# Create the Compliance_Model
compliance = Compliance_Base_Model(
compliance = ComplianceBaseModel(
Framework=framework.Framework,
Provider=framework.Provider,
Version=framework.Version,
Expand Down
8 changes: 4 additions & 4 deletions prowler/lib/check/compliance_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@
Checks: list[str]


class Compliance_Base_Model(BaseModel):
"""Compliance_Base_Model holds the base model for every compliance framework"""
class ComplianceBaseModel(BaseModel):
"""ComplianceBaseModel holds the base model for every compliance framework"""

Framework: str
Provider: str
Expand Down Expand Up @@ -218,10 +218,10 @@
# Testing Pending
def load_compliance_framework(
compliance_specification_file: str,
) -> Compliance_Base_Model:
) -> ComplianceBaseModel:
"""load_compliance_framework loads and parse a Compliance Framework Specification"""
try:
compliance_framework = Compliance_Base_Model.parse_file(
compliance_framework = ComplianceBaseModel.parse_file(

Check warning on line 224 in prowler/lib/check/compliance_models.py

View check run for this annotation

Codecov / codecov/patch

prowler/lib/check/compliance_models.py#L224

Added line #L224 was not covered by tests
compliance_specification_file
)
except ValidationError as error:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,6 @@
from tabulate import tabulate

from prowler.config.config import orange_color
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance.cis_aws import generate_compliance_row_cis_aws
from prowler.lib.outputs.compliance.cis_azure import generate_compliance_row_cis_azure
from prowler.lib.outputs.compliance.cis_gcp import generate_compliance_row_cis_gcp
from prowler.lib.outputs.compliance.cis_kubernetes import (
generate_compliance_row_cis_kubernetes,
)
from prowler.lib.outputs.csv.csv import write_csv


def write_compliance_row_cis(
file_descriptors,
finding,
compliance,
output_options,
provider,
input_compliance_frameworks,
):
try:
compliance_output = (
"cis_" + compliance.Version + "_" + compliance.Provider.lower()
)

# Only with the version of CIS that was selected
if compliance_output in str(input_compliance_frameworks):
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
if compliance.Provider == "AWS":
(compliance_row, csv_header) = generate_compliance_row_cis_aws(
finding,
compliance,
requirement,
attribute,
output_options,
provider,
)
elif compliance.Provider == "Azure":
(compliance_row, csv_header) = (
generate_compliance_row_cis_azure(
finding,
compliance,
requirement,
attribute,
output_options,
)
)
elif compliance.Provider == "GCP":
(compliance_row, csv_header) = generate_compliance_row_cis_gcp(
finding, compliance, requirement, attribute, output_options
)
elif compliance.Provider == "Kubernetes":
(compliance_row, csv_header) = (
generate_compliance_row_cis_kubernetes(
finding,
compliance,
requirement,
attribute,
output_options,
provider,
)
)

write_csv(
file_descriptors[compliance_output], csv_header, compliance_row
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


def get_cis_table(
Expand Down
99 changes: 99 additions & 0 deletions prowler/lib/outputs/compliance/cis/cis_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from csv import DictWriter
from venv import logger

from prowler.lib.check.compliance_models import ComplianceBaseModel
from prowler.lib.outputs.compliance.cis.models import AWS
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.finding import Finding


class AWSCIS(ComplianceOutput):
"""
This class represents the AWS CIS compliance output.

Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.

Methods:
- transform: Transforms findings into AWS CIS compliance format.
- batch_write_data_to_file: Writes the findings data to a CSV file in AWS CIS compliance format.
"""

def transform(
self,
findings: list[Finding],
compliance: ComplianceBaseModel,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS CIS compliance format.

Parameters:
- findings (list): A list of findings.
- compliance (ComplianceBaseModel): A compliance model.
- compliance_name (str): The name of the compliance model.

Returns:
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWS(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
Region=finding.region,
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_Profile=attribute.Profile,
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus,
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement,
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement,
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure,
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_References=attribute.References,
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
CheckId=finding.check_id,
Muted=finding.muted,
)
self._data.append(compliance_row)

def batch_write_data_to_file(self) -> None:
"""
Writes the findings data to a CSV file in AWS CIS compliance format.

Returns:
- None
"""
try:
if (
getattr(self, "_file_descriptor", None)
and not self._file_descriptor.closed
and self._data
):
csv_writer = DictWriter(
self._file_descriptor,
fieldnames=[field.upper() for field in self._data[0].dict().keys()],
delimiter=";",
)
csv_writer.writeheader()
for finding in self._data:
csv_writer.writerow(
{k.upper(): v for k, v in finding.dict().items()}
)
self._file_descriptor.close()
except Exception as error:
logger.error(

Check warning on line 97 in prowler/lib/outputs/compliance/cis/cis_aws.py

View check run for this annotation

Codecov / codecov/patch

prowler/lib/outputs/compliance/cis/cis_aws.py#L96-L97

Added lines #L96 - L97 were not covered by tests
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
Loading