From b76dd7614ebcdf761f5245ea8cfdfd64485c8cf5 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 16 Sep 2024 13:38:02 -0700 Subject: [PATCH 1/2] Update enrichments to always require checkout of mitre/cti repo. --- .github/workflows/test_against_escu.yml | 5 +- contentctl/actions/validate.py | 2 +- contentctl/contentctl.py | 3 + contentctl/enrichments/attack_enrichment.py | 128 ++++++++------------ contentctl/objects/config.py | 41 ++++++- 5 files changed, 94 insertions(+), 85 deletions(-) diff --git a/.github/workflows/test_against_escu.yml b/.github/workflows/test_against_escu.yml index 4c8fa3d2..b527a6ee 100644 --- a/.github/workflows/test_against_escu.yml +++ b/.github/workflows/test_against_escu.yml @@ -53,10 +53,11 @@ jobs: poetry install --no-interaction - - name: Clone the AtomicRedTeam Repo (for extended validation) + - name: Clone the AtomicRedTeam Repo and the Mitre/CTI repos for testing enrichments run: | cd security_content - git clone --depth 1 https://github.com/redcanaryco/atomic-red-team + git clone --single-branch https://github.com/redcanaryco/atomic-red-team external_repos/atomic-red-team + git clone --single-branch https://github.com/mitre/cti external_repos/cti # We do not separately run validate and build diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index cbfa7615..909256a4 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -16,7 +16,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto: director_output_dto = DirectorOutputDto( AtomicTest.getAtomicTestsFromArtRepo( - repo_path=input_dto.getAtomicRedTeamRepoPath(), + repo_path=input_dto.atomic_red_team_repo_path, enabled=input_dto.enrichments, ), AttackEnrichment.getAttackEnrichment(input_dto), diff --git a/contentctl/contentctl.py b/contentctl/contentctl.py index e6d8c5d7..dbf434a7 100644 --- a/contentctl/contentctl.py +++ b/contentctl/contentctl.py @@ -211,6 +211,9 @@ def main(): test_common_func(config) else: raise Exception(f"Unknown command line type '{type(config).__name__}'") + except FileNotFoundError as e: + print(e) + sys.exit(1) except Exception as e: if config is None: print("There was a serious issue where the config file could not be created.\n" diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index 4943972d..c01d2b99 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -1,14 +1,13 @@ from __future__ import annotations -import csv -import os import sys from attackcti import attack_client import logging -from pydantic import BaseModel, Field +from pydantic import BaseModel from dataclasses import field -from typing import Annotated,Any -from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment +from typing import Any +from pathlib import Path +from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment, MitreTactics from contentctl.objects.config import validate from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE logging.getLogger('taxii2client').setLevel(logging.CRITICAL) @@ -21,7 +20,7 @@ class AttackEnrichment(BaseModel): @staticmethod def getAttackEnrichment(config:validate)->AttackEnrichment: enrichment = AttackEnrichment(use_enrichment=config.enrichments) - _ = enrichment.get_attack_lookup(str(config.path)) + _ = enrichment.get_attack_lookup(config.mitre_cti_repo_path, config.enrichments) return enrichment def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnrichment: @@ -34,71 +33,69 @@ def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnri else: raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}") - def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None: + def addMitreIDViaGroupNames(self, technique:dict[str,Any], tactics:list[str], groupNames:list[str])->None: technique_id = technique['technique_id'] technique_obj = technique['technique'] tactics.sort() if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") - self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, - mitre_attack_technique=technique_obj, - mitre_attack_tactics=tactics, - mitre_attack_groups=groupNames, - mitre_attack_group_objects=[]) + self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id':technique_id, + 'mitre_attack_technique':technique_obj, + 'mitre_attack_tactics':tactics, + 'mitre_attack_groups':groupNames, + 'mitre_attack_group_objects':[]}) - def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None: + def addMitreIDViaGroupObjects(self, technique:dict[str,Any], tactics:list[MitreTactics], groupDicts:list[dict[str,Any]])->None: technique_id = technique['technique_id'] technique_obj = technique['technique'] tactics.sort() - groupNames:list[str] = sorted([group['group'] for group in groupObjects]) + groupNames:list[str] = sorted([group['group'] for group in groupDicts]) if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") - self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, - mitre_attack_technique=technique_obj, - mitre_attack_tactics=tactics, - mitre_attack_groups=groupNames, - mitre_attack_group_objects=groupObjects) + + self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id': technique_id, + 'mitre_attack_technique': technique_obj, + 'mitre_attack_tactics': tactics, + 'mitre_attack_groups': groupNames, + 'mitre_attack_group_objects': groupDicts}) - def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict: - if not self.use_enrichment: - return {} - print("Getting MITRE Attack Enrichment Data. This may take some time...") - attack_lookup = dict() - file_path = os.path.join(input_path, "app_template", "lookups", "mitre_enrichment.csv") - - if skip_enrichment is True: - print("Skipping enrichment") + def get_attack_lookup(self, input_path: Path, enrichments:bool = False) -> dict[str,MitreAttackEnrichment]: + attack_lookup:dict[str,MitreAttackEnrichment] = {} + if not enrichments: return attack_lookup + try: - - if force_cached_or_offline is True: - raise(Exception("WARNING - Using cached MITRE Attack Enrichment. Attack Enrichment may be out of date. Only use this setting for offline environments and development purposes.")) - print(f"\r{'Client'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True) - lift = attack_client() - print(f"\r{'Client'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True) + print(f"Performing MITRE Enrichment using the repository at {input_path}...",end="", flush=True) + # The existence of the input_path is validated during cli argument validation, but it is + # possible that the repo is in the wrong format. If the following directories do not + # exist, then attack_client will fall back to resolving via REST API. We do not + # want this as it is slow and error prone, so we will force an exception to + # be generated. + enterprise_path = input_path/"enterprise-attack" + mobile_path = input_path/"ics-attack" + ics_path = input_path/"mobile-attack" + if not (enterprise_path.is_dir() and mobile_path.is_dir() and ics_path.is_dir()): + raise FileNotFoundError("One or more of the following paths does not exist: " + f"{[str(enterprise_path),str(mobile_path),str(ics_path)]}. " + f"Please ensure that the {input_path} directory " + "has been git cloned correctly.") + lift = attack_client( + local_paths= { + "enterprise":str(enterprise_path), + "mobile":str(mobile_path), + "ics":str(ics_path) + } + ) - print(f"\r{'Techniques'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True) all_enterprise_techniques = lift.get_enterprise_techniques(stix_format=False) - - print(f"\r{'Techniques'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True) - - print(f"\r{'Relationships'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True) enterprise_relationships = lift.get_enterprise_relationships(stix_format=False) - print(f"\r{'Relationships'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True) - - print(f"\r{'Groups'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True) enterprise_groups = lift.get_enterprise_groups(stix_format=False) - print(f"\r{'Groups'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True) - - for index, technique in enumerate(all_enterprise_techniques): - progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100 - if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()): - print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True) + for technique in all_enterprise_techniques: apt_groups:list[dict[str,Any]] = [] for relationship in enterprise_relationships: if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'): @@ -115,39 +112,10 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach self.addMitreIDViaGroupObjects(technique, tactics, apt_groups) attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups} - if store_csv: - f = open(file_path, 'w') - writer = csv.writer(f) - writer.writerow(['mitre_id', 'technique', 'tactics' ,'groups']) - for key in attack_lookup.keys(): - if len(attack_lookup[key]['groups']) == 0: - groups = 'no' - else: - groups = '|'.join(attack_lookup[key]['groups']) - - writer.writerow([ - key, - attack_lookup[key]['technique'], - '|'.join(attack_lookup[key]['tactics']), - groups - ]) - - f.close() + except Exception as err: - print(f'\nError: {str(err)}') - print('Use local copy app_template/lookups/mitre_enrichment.csv') - with open(file_path, mode='r') as inp: - reader = csv.reader(inp) - attack_lookup = {rows[0]:{'technique': rows[1], 'tactics': rows[2].split('|'), 'groups': rows[3].split('|')} for rows in reader} - attack_lookup.pop('mitre_id') - for key in attack_lookup.keys(): - technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] } - tactics_input = attack_lookup[key]['tactics'] - groups_input = attack_lookup[key]['groups'] - self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input) - - - + raise Exception(f"Error getting MITRE Enrichment: {str(err)}") + print("Done!") return attack_lookup \ No newline at end of file diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index cedb4ae7..d506362b 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -191,8 +191,45 @@ class validate(Config_Base): build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?") data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase") - def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"): - return self.path/atomic_red_team_repo_name + @property + def external_repos_path(self)->pathlib.Path: + return self.path/"external_repos" + + @property + def mitre_cti_repo_path(self)->pathlib.Path: + return self.external_repos_path/"cti" + + @property + def atomic_red_team_repo_path(self): + return self.external_repos_path/"atomic-red-team" + + @model_validator(mode="after") + def ensureEnrichmentReposPresent(self)->Self: + ''' + Ensures that the enrichments repos, the atomic red team repo and the + mitre attack enrichment repo, are present at the inded path. + Raises a detailed exception if either of these are not present + when enrichments are enabled. + ''' + if not self.enrichments: + return self + # If enrichments are enabled, ensure that all of the + # enrichment directories exist + missing_repos:list[str] = [] + if not self.atomic_red_team_repo_path.is_dir(): + missing_repos.append(f"https://github.com/redcanaryco/atomic-red-team {self.atomic_red_team_repo_path}") + + if not self.mitre_cti_repo_path.is_dir(): + missing_repos.append(f"https://github.com/mitre/cti {self.mitre_cti_repo_path}") + + if len(missing_repos) > 0: + msg_list = ["The following repositories, which are required for enrichment, have not " + f"been checked out to the {self.external_repos_path} directory. " + "Please check them out using the following commands:"] + msg_list.extend([f"git clone --single-branch {repo_string}" for repo_string in missing_repos]) + msg = '\n\t'.join(msg_list) + raise FileNotFoundError(msg) + return self class report(validate): #reporting takes no extra args, but we define it here so that it can be a mode on the command line From 1974503bc3463842b5a02c7ced0023b370ed5eb7 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 17 Sep 2024 12:10:56 -0700 Subject: [PATCH 2/2] More cleanup and error handling to how enrichment is done. It now works as expected and the proper errors are thrown for AtomicEnrichment depending on the --enrichments cli setting. --- contentctl/actions/validate.py | 10 +- contentctl/enrichments/attack_enrichment.py | 2 +- contentctl/input/director.py | 6 +- contentctl/objects/atomic.py | 128 ++++++++------------ contentctl/objects/detection_tags.py | 12 +- 5 files changed, 64 insertions(+), 94 deletions(-) diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index 909256a4..a6fce6dc 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -5,20 +5,16 @@ from contentctl.objects.config import validate from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment -from contentctl.objects.atomic import AtomicTest +from contentctl.objects.atomic import AtomicEnrichment from contentctl.helper.utils import Utils from contentctl.objects.data_source import DataSource from contentctl.helper.splunk_app import SplunkApp class Validate: - def execute(self, input_dto: validate) -> DirectorOutputDto: - + def execute(self, input_dto: validate) -> DirectorOutputDto: director_output_dto = DirectorOutputDto( - AtomicTest.getAtomicTestsFromArtRepo( - repo_path=input_dto.atomic_red_team_repo_path, - enabled=input_dto.enrichments, - ), + AtomicEnrichment.getAtomicEnrichment(input_dto), AttackEnrichment.getAttackEnrichment(input_dto), CveEnrichment.getCveEnrichment(input_dto), [], diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index c01d2b99..71e1c955 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -25,7 +25,7 @@ def getAttackEnrichment(config:validate)->AttackEnrichment: def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnrichment: if not self.use_enrichment: - raise Exception(f"Error, trying to add Mitre Enrichment, but use_enrichment was set to False") + raise Exception("Error, trying to add Mitre Enrichment, but use_enrichment was set to False") enrichment = self.data.get(mitre_id, None) if enrichment is not None: diff --git a/contentctl/input/director.py b/contentctl/input/director.py index a75090eb..c6d3b9c3 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -18,7 +18,7 @@ from contentctl.objects.deployment import Deployment from contentctl.objects.macro import Macro from contentctl.objects.lookup import Lookup -from contentctl.objects.atomic import AtomicTest +from contentctl.objects.atomic import AtomicEnrichment from contentctl.objects.security_content_object import SecurityContentObject from contentctl.objects.data_source import DataSource from contentctl.objects.event_source import EventSource @@ -39,7 +39,7 @@ class DirectorOutputDto: # Atomic Tests are first because parsing them # is far quicker than attack_enrichment - atomic_tests: None | list[AtomicTest] + atomic_enrichment: AtomicEnrichment attack_enrichment: AttackEnrichment cve_enrichment: CveEnrichment detections: list[Detection] @@ -145,7 +145,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: f for f in files ] else: - raise (Exception(f"Cannot createSecurityContent for unknown product.")) + raise (Exception(f"Cannot createSecurityContent for unknown product {contentType}.")) validation_errors = [] diff --git a/contentctl/objects/atomic.py b/contentctl/objects/atomic.py index 63926ede..a723304d 100644 --- a/contentctl/objects/atomic.py +++ b/contentctl/objects/atomic.py @@ -1,12 +1,15 @@ from __future__ import annotations +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from contentctl.objects.config import validate + from contentctl.input.yml_reader import YmlReader from pydantic import BaseModel, model_validator, ConfigDict, FilePath, UUID4 +import dataclasses from typing import List, Optional, Dict, Union, Self import pathlib - - from enum import StrEnum, auto - +import uuid class SupportedPlatform(StrEnum): windows = auto() @@ -84,15 +87,6 @@ class AtomicTest(BaseModel): dependencies: Optional[List[AtomicDependency]] = None dependency_executor_name: Optional[DependencyExecutorType] = None - @staticmethod - def AtomicTestWhenEnrichmentIsDisabled(auto_generated_guid: UUID4) -> AtomicTest: - return AtomicTest(name="Placeholder Atomic Test (enrichment disabled)", - auto_generated_guid=auto_generated_guid, - description="This is a placeholder AtomicTest. Because enrichments were not enabled, it has not been validated against the real Atomic Red Team Repo.", - supported_platforms=[], - executor=AtomicExecutor(name="Placeholder Executor (enrichment disabled)", - command="Placeholder command (enrichment disabled)")) - @staticmethod def AtomicTestWhenTestIsMissing(auto_generated_guid: UUID4) -> AtomicTest: return AtomicTest(name="Missing Atomic", @@ -100,31 +94,16 @@ def AtomicTestWhenTestIsMissing(auto_generated_guid: UUID4) -> AtomicTest: description="This is a placeholder AtomicTest. Either the auto_generated_guid is incorrect or it there was an exception while parsing its AtomicFile.", supported_platforms=[], executor=AtomicExecutor(name="Placeholder Executor (failed to find auto_generated_guid)", - command="Placeholder command (failed to find auto_generated_guid)")) - - - @classmethod - def getAtomicByAtomicGuid(cls, guid: UUID4, all_atomics:list[AtomicTest] | None)->AtomicTest: - if all_atomics is None: - return AtomicTest.AtomicTestWhenEnrichmentIsDisabled(guid) - matching_atomics = [atomic for atomic in all_atomics if atomic.auto_generated_guid == guid] - if len(matching_atomics) == 0: - raise ValueError(f"Unable to find atomic_guid {guid} in {len(all_atomics)} atomic_tests from ART Repo") - elif len(matching_atomics) > 1: - raise ValueError(f"Found {len(matching_atomics)} matching tests for atomic_guid {guid} in {len(all_atomics)} atomic_tests from ART Repo") - - return matching_atomics[0] + command="Placeholder command (failed to find auto_generated_guid)")) @classmethod - def parseArtRepo(cls, repo_path:pathlib.Path)->List[AtomicFile]: - if not repo_path.is_dir(): - print(f"WARNING: Atomic Red Team repo does NOT exist at {repo_path.absolute()}. You can check it out with:\n * git clone --single-branch https://github.com/redcanaryco/atomic-red-team. This will ONLY throw a validation error if you reference atomid_guids in your detection(s).") - return [] + def parseArtRepo(cls, repo_path:pathlib.Path)->dict[uuid.UUID, AtomicTest]: + test_mapping: dict[uuid.UUID, AtomicTest] = {} atomics_path = repo_path/"atomics" if not atomics_path.is_dir(): - print(f"WARNING: Atomic Red Team repo exists at {repo_path.absolute}, but atomics directory does NOT exist at {atomics_path.absolute()}. Was it deleted or renamed? This will ONLY throw a validation error if you reference atomid_guids in your detection(s).") - return [] - + raise FileNotFoundError(f"WARNING: Atomic Red Team repo exists at {repo_path}, " + f"but atomics directory does NOT exist at {atomics_path}. " + "Was it deleted or renamed?") atomic_files:List[AtomicFile] = [] error_messages:List[str] = [] @@ -133,6 +112,7 @@ def parseArtRepo(cls, repo_path:pathlib.Path)->List[AtomicFile]: atomic_files.append(cls.constructAtomicFile(obj_path)) except Exception as e: error_messages.append(f"File [{obj_path}]\n{str(e)}") + if len(error_messages) > 0: exceptions_string = '\n\n'.join(error_messages) print(f"WARNING: The following [{len(error_messages)}] ERRORS were generated when parsing the Atomic Red Team Repo.\n" @@ -140,38 +120,28 @@ def parseArtRepo(cls, repo_path:pathlib.Path)->List[AtomicFile]: "Note that this is only a warning and contentctl will ignore Atomics contained in these files.\n" f"However, if you have written a detection that references them, 'contentctl build --enrichments' will fail:\n\n{exceptions_string}") - return atomic_files + # Now iterate over all the files, collect all the tests, and return the dict mapping + redefined_guids:set[uuid.UUID] = set() + for atomic_file in atomic_files: + for atomic_test in atomic_file.atomic_tests: + if atomic_test.auto_generated_guid in test_mapping: + redefined_guids.add(atomic_test.auto_generated_guid) + else: + test_mapping[atomic_test.auto_generated_guid] = atomic_test + if len(redefined_guids) > 0: + guids_string = '\n\t'.join([str(guid) for guid in redefined_guids]) + raise Exception(f"The following [{len(redefined_guids)}] Atomic Test" + " auto_generated_guid(s) were defined more than once. " + f"auto_generated_guids MUST be unique:\n\t{guids_string}") + + print(f"Successfully parsed [{len(test_mapping)}] Atomic Red Team Tests!") + return test_mapping @classmethod def constructAtomicFile(cls, file_path:pathlib.Path)->AtomicFile: yml_dict = YmlReader.load_file(file_path) atomic_file = AtomicFile.model_validate(yml_dict) return atomic_file - - @classmethod - def getAtomicTestsFromArtRepo(cls, repo_path:pathlib.Path, enabled:bool=True)->list[AtomicTest] | None: - # Get all the atomic files. Note that if the ART repo is not found, we will not throw an error, - # but will not have any atomics. This means that if atomic_guids are referenced during validation, - # validation for those detections will fail - if not enabled: - return None - - atomic_files = cls.getAtomicFilesFromArtRepo(repo_path) - - atomic_tests:List[AtomicTest] = [] - for atomic_file in atomic_files: - atomic_tests.extend(atomic_file.atomic_tests) - print(f"Found [{len(atomic_tests)}] Atomic Simulations in the Atomic Red Team Repo!") - return atomic_tests - - - @classmethod - def getAtomicFilesFromArtRepo(cls, repo_path:pathlib.Path)->List[AtomicFile]: - return cls.parseArtRepo(repo_path) - - - - class AtomicFile(BaseModel): @@ -182,27 +152,31 @@ class AtomicFile(BaseModel): atomic_tests: List[AtomicTest] +class AtomicEnrichment(BaseModel): + data: dict[uuid.UUID,AtomicTest] = dataclasses.field(default_factory = dict) + use_enrichment: bool = False + @classmethod + def getAtomicEnrichment(cls, config:validate)->AtomicEnrichment: + enrichment = AtomicEnrichment(use_enrichment=config.enrichments) + if config.enrichments: + enrichment.data = AtomicTest.parseArtRepo(config.atomic_red_team_repo_path) + + return enrichment + + def getAtomic(self, atomic_guid: uuid.UUID)->AtomicTest: + if self.use_enrichment: + if atomic_guid in self.data: + return self.data[atomic_guid] + else: + raise Exception(f"Atomic with GUID {atomic_guid} not found.") + else: + # If enrichment is not enabled, for the sake of compatability + # return a stub test with no useful or meaningful information. + return AtomicTest.AtomicTestWhenTestIsMissing(atomic_guid) -# ATOMICS_PATH = pathlib.Path("./atomics") -# atomic_objects = [] -# atomic_simulations = [] -# for obj_path in ATOMICS_PATH.glob("**/T*.yaml"): -# try: -# with open(obj_path, 'r', encoding="utf-8") as obj_handle: -# obj_data = yaml.load(obj_handle, Loader=yaml.CSafeLoader) -# atomic_obj = AtomicFile.model_validate(obj_data) -# except Exception as e: -# print(f"Error parsing object at path {obj_path}: {str(e)}") -# print(f"We have successfully parsed {len(atomic_objects)}, however!") -# sys.exit(1) - -# print(f"Successfully parsed {obj_path}!") -# atomic_objects.append(atomic_obj) -# atomic_simulations += atomic_obj.atomic_tests + -# print(f"Successfully parsed all {len(atomic_objects)} files!") -# print(f"Successfully parsed all {len(atomic_simulations)} simulations!") diff --git a/contentctl/objects/detection_tags.py b/contentctl/objects/detection_tags.py index bae5a4d9..2998006f 100644 --- a/contentctl/objects/detection_tags.py +++ b/contentctl/objects/detection_tags.py @@ -1,6 +1,6 @@ from __future__ import annotations import uuid -from typing import TYPE_CHECKING, List, Optional, Annotated, Union +from typing import TYPE_CHECKING, List, Optional, Union from pydantic import ( BaseModel, Field, @@ -32,7 +32,7 @@ RiskLevel, SecurityContentProductName ) -from contentctl.objects.atomic import AtomicTest +from contentctl.objects.atomic import AtomicEnrichment, AtomicTest from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE, CVE_TYPE # TODO (#266): disable the use_enum_values configuration @@ -240,7 +240,7 @@ def mapAtomicGuidsToAtomicTests(cls, v: List[UUID4], info: ValidationInfo) -> Li if output_dto is None: raise ValueError("Context not provided to detection.detection_tags.atomic_guid validator") - all_tests: None | List[AtomicTest] = output_dto.atomic_tests + atomic_enrichment: AtomicEnrichment = output_dto.atomic_enrichment matched_tests: List[AtomicTest] = [] missing_tests: List[UUID4] = [] @@ -254,7 +254,7 @@ def mapAtomicGuidsToAtomicTests(cls, v: List[UUID4], info: ValidationInfo) -> Li badly_formatted_guids.append(str(atomic_guid_str)) continue try: - matched_tests.append(AtomicTest.getAtomicByAtomicGuid(atomic_guid, all_tests)) + matched_tests.append(atomic_enrichment.getAtomic(atomic_guid)) except Exception: missing_tests.append(atomic_guid) @@ -265,7 +265,7 @@ def mapAtomicGuidsToAtomicTests(cls, v: List[UUID4], info: ValidationInfo) -> Li f"\n\tPlease review the output above for potential exception(s) when parsing the " "Atomic Red Team Repo." "\n\tVerify that these auto_generated_guid exist and try updating/pulling the " - f"repo again.: {[str(guid) for guid in missing_tests]}" + f"repo again: {[str(guid) for guid in missing_tests]}" ) else: missing_tests_string = "" @@ -278,6 +278,6 @@ def mapAtomicGuidsToAtomicTests(cls, v: List[UUID4], info: ValidationInfo) -> Li raise ValueError(f"{bad_guids_string}{missing_tests_string}") elif len(missing_tests) > 0: - print(missing_tests_string) + raise ValueError(missing_tests_string) return matched_tests + [AtomicTest.AtomicTestWhenTestIsMissing(test) for test in missing_tests]