Skip to content

Commit

Permalink
Merge pull request #291 from splunk/simple_enrichment_update
Browse files Browse the repository at this point in the history
Require mitre/cti repo for enrichments
  • Loading branch information
pyth0n1c authored Sep 18, 2024
2 parents bf6fe08 + 3149d8d commit 9ccd66b
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 178 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test_against_escu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ jobs:
poetry install --no-interaction
- name: Clone the AtomicRedTeam Repo (for extended validation)
- name: Clone the AtomicRedTeam Repo and the Mitre/CTI repos for testing enrichments
run: |
cd security_content
git clone --depth 1 https://github.com/redcanaryco/atomic-red-team
git clone --single-branch https://github.com/redcanaryco/atomic-red-team external_repos/atomic-red-team
git clone --single-branch https://github.com/mitre/cti external_repos/cti
# We do not separately run validate and build
Expand Down
10 changes: 3 additions & 7 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@
from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicTest
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.helper.utils import Utils
from contentctl.objects.data_source import DataSource
from contentctl.helper.splunk_app import SplunkApp


class Validate:
def execute(self, input_dto: validate) -> DirectorOutputDto:

def execute(self, input_dto: validate) -> DirectorOutputDto:
director_output_dto = DirectorOutputDto(
AtomicTest.getAtomicTestsFromArtRepo(
repo_path=input_dto.getAtomicRedTeamRepoPath(),
enabled=input_dto.enrichments,
),
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],
Expand Down
3 changes: 3 additions & 0 deletions contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def main():
test_common_func(config)
else:
raise Exception(f"Unknown command line type '{type(config).__name__}'")
except FileNotFoundError as e:
print(e)
sys.exit(1)
except Exception as e:
if config is None:
print("There was a serious issue where the config file could not be created.\n"
Expand Down
130 changes: 49 additions & 81 deletions contentctl/enrichments/attack_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

from __future__ import annotations
import csv
import os
import sys
from attackcti import attack_client
import logging
from pydantic import BaseModel, Field
from pydantic import BaseModel
from dataclasses import field
from typing import Annotated,Any
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
from typing import Any
from pathlib import Path
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment, MitreTactics
from contentctl.objects.config import validate
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
logging.getLogger('taxii2client').setLevel(logging.CRITICAL)
Expand All @@ -21,84 +20,82 @@ class AttackEnrichment(BaseModel):
@staticmethod
def getAttackEnrichment(config:validate)->AttackEnrichment:
enrichment = AttackEnrichment(use_enrichment=config.enrichments)
_ = enrichment.get_attack_lookup(str(config.path))
_ = enrichment.get_attack_lookup(config.mitre_cti_repo_path, config.enrichments)
return enrichment

def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnrichment:
if not self.use_enrichment:
raise Exception(f"Error, trying to add Mitre Enrichment, but use_enrichment was set to False")
raise Exception("Error, trying to add Mitre Enrichment, but use_enrichment was set to False")

enrichment = self.data.get(mitre_id, None)
if enrichment is not None:
return enrichment
else:
raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}")

def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None:
def addMitreIDViaGroupNames(self, technique:dict[str,Any], tactics:list[str], groupNames:list[str])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=[])
self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id':technique_id,
'mitre_attack_technique':technique_obj,
'mitre_attack_tactics':tactics,
'mitre_attack_groups':groupNames,
'mitre_attack_group_objects':[]})

def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None:
def addMitreIDViaGroupObjects(self, technique:dict[str,Any], tactics:list[MitreTactics], groupDicts:list[dict[str,Any]])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

groupNames:list[str] = sorted([group['group'] for group in groupObjects])
groupNames:list[str] = sorted([group['group'] for group in groupDicts])

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=groupObjects)

self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id': technique_id,
'mitre_attack_technique': technique_obj,
'mitre_attack_tactics': tactics,
'mitre_attack_groups': groupNames,
'mitre_attack_group_objects': groupDicts})


def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict:
if not self.use_enrichment:
return {}
print("Getting MITRE Attack Enrichment Data. This may take some time...")
attack_lookup = dict()
file_path = os.path.join(input_path, "app_template", "lookups", "mitre_enrichment.csv")

if skip_enrichment is True:
print("Skipping enrichment")
def get_attack_lookup(self, input_path: Path, enrichments:bool = False) -> dict[str,MitreAttackEnrichment]:
attack_lookup:dict[str,MitreAttackEnrichment] = {}
if not enrichments:
return attack_lookup

try:

if force_cached_or_offline is True:
raise(Exception("WARNING - Using cached MITRE Attack Enrichment. Attack Enrichment may be out of date. Only use this setting for offline environments and development purposes."))
print(f"\r{'Client'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
lift = attack_client()
print(f"\r{'Client'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
print(f"Performing MITRE Enrichment using the repository at {input_path}...",end="", flush=True)
# The existence of the input_path is validated during cli argument validation, but it is
# possible that the repo is in the wrong format. If the following directories do not
# exist, then attack_client will fall back to resolving via REST API. We do not
# want this as it is slow and error prone, so we will force an exception to
# be generated.
enterprise_path = input_path/"enterprise-attack"
mobile_path = input_path/"ics-attack"
ics_path = input_path/"mobile-attack"
if not (enterprise_path.is_dir() and mobile_path.is_dir() and ics_path.is_dir()):
raise FileNotFoundError("One or more of the following paths does not exist: "
f"{[str(enterprise_path),str(mobile_path),str(ics_path)]}. "
f"Please ensure that the {input_path} directory "
"has been git cloned correctly.")
lift = attack_client(
local_paths= {
"enterprise":str(enterprise_path),
"mobile":str(mobile_path),
"ics":str(ics_path)
}
)

print(f"\r{'Techniques'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
all_enterprise_techniques = lift.get_enterprise_techniques(stix_format=False)

print(f"\r{'Techniques'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)

print(f"\r{'Relationships'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
enterprise_relationships = lift.get_enterprise_relationships(stix_format=False)
print(f"\r{'Relationships'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)

print(f"\r{'Groups'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
enterprise_groups = lift.get_enterprise_groups(stix_format=False)
print(f"\r{'Groups'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)


for index, technique in enumerate(all_enterprise_techniques):
progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100
if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()):
print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)
for technique in all_enterprise_techniques:
apt_groups:list[dict[str,Any]] = []
for relationship in enterprise_relationships:
if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'):
Expand All @@ -115,39 +112,10 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach
self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups}

if store_csv:
f = open(file_path, 'w')
writer = csv.writer(f)
writer.writerow(['mitre_id', 'technique', 'tactics' ,'groups'])
for key in attack_lookup.keys():
if len(attack_lookup[key]['groups']) == 0:
groups = 'no'
else:
groups = '|'.join(attack_lookup[key]['groups'])

writer.writerow([
key,
attack_lookup[key]['technique'],
'|'.join(attack_lookup[key]['tactics']),
groups
])

f.close()


except Exception as err:
print(f'\nError: {str(err)}')
print('Use local copy app_template/lookups/mitre_enrichment.csv')
with open(file_path, mode='r') as inp:
reader = csv.reader(inp)
attack_lookup = {rows[0]:{'technique': rows[1], 'tactics': rows[2].split('|'), 'groups': rows[3].split('|')} for rows in reader}
attack_lookup.pop('mitre_id')
for key in attack_lookup.keys():
technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] }
tactics_input = attack_lookup[key]['tactics']
groups_input = attack_lookup[key]['groups']
self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input)



raise Exception(f"Error getting MITRE Enrichment: {str(err)}")

print("Done!")
return attack_lookup
6 changes: 3 additions & 3 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from contentctl.objects.deployment import Deployment
from contentctl.objects.macro import Macro
from contentctl.objects.lookup import Lookup
from contentctl.objects.atomic import AtomicTest
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.data_source import DataSource
from contentctl.objects.event_source import EventSource
Expand All @@ -39,7 +39,7 @@
class DirectorOutputDto:
# Atomic Tests are first because parsing them
# is far quicker than attack_enrichment
atomic_tests: None | list[AtomicTest]
atomic_enrichment: AtomicEnrichment
attack_enrichment: AttackEnrichment
cve_enrichment: CveEnrichment
detections: list[Detection]
Expand Down Expand Up @@ -145,7 +145,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
f for f in files
]
else:
raise (Exception(f"Cannot createSecurityContent for unknown product."))
raise (Exception(f"Cannot createSecurityContent for unknown product {contentType}."))

validation_errors = []

Expand Down
Loading

0 comments on commit 9ccd66b

Please sign in to comment.