From 5d5f00a4ff3b78319fbfe35f96be65b9d05d2725 Mon Sep 17 00:00:00 2001 From: ljstella Date: Thu, 22 Aug 2024 14:31:57 -0500 Subject: [PATCH 1/8] Removal of more bits of SSA --- contentctl/api.py | 2 +- contentctl/input/director.py | 11 +- contentctl/objects/enums.py | 1 - contentctl/objects/ssa_detection.py | 156 -------------------- contentctl/objects/ssa_detection_tags.py | 138 ----------------- contentctl/output/new_content_yml_output.py | 13 +- 6 files changed, 7 insertions(+), 314 deletions(-) delete mode 100644 contentctl/objects/ssa_detection.py delete mode 100644 contentctl/objects/ssa_detection_tags.py diff --git a/contentctl/api.py b/contentctl/api.py index 5de988ec..8c996549 100644 --- a/contentctl/api.py +++ b/contentctl/api.py @@ -126,7 +126,7 @@ def update_config(config:Union[test,test_servers], **key_value_updates:dict[str, def content_to_dict(director:DirectorOutputDto)->dict[str,list[dict[str,Any]]]: output_dict:dict[str,list[dict[str,Any]]] = {} for contentType in ['detections','stories','baselines','investigations', - 'playbooks','macros','lookups','deployments','ssa_detections']: + 'playbooks','macros','lookups','deployments',]: output_dict[contentType] = [] t:list[SecurityContentObject] = getattr(director,contentType) diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 0e27add6..a75090eb 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -18,7 +18,6 @@ from contentctl.objects.deployment import Deployment from contentctl.objects.macro import Macro from contentctl.objects.lookup import Lookup -from contentctl.objects.ssa_detection import SSADetection from contentctl.objects.atomic import AtomicTest from contentctl.objects.security_content_object import SecurityContentObject from contentctl.objects.data_source import DataSource @@ -33,10 +32,7 @@ from contentctl.objects.enums import DetectionStatus from contentctl.helper.utils import Utils -from contentctl.objects.enums import SecurityContentType -from contentctl.objects.enums import DetectionStatus -from contentctl.helper.utils import Utils @dataclass @@ -60,10 +56,7 @@ class DirectorOutputDto: def addContentToDictMappings(self, content: SecurityContentObject): content_name = content.name - if isinstance(content, SSADetection): - # Since SSA detections may have the same name as ESCU detection, - # for this function we prepend 'SSA ' to the name. - content_name = f"SSA {content_name}" + if content_name in self.name_to_content_map: raise ValueError( @@ -149,7 +142,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: os.path.join(self.input_dto.path, str(contentType.name)) ) security_content_files = [ - f for f in files if not f.name.startswith("ssa___") + f for f in files ] else: raise (Exception(f"Cannot createSecurityContent for unknown product.")) diff --git a/contentctl/objects/enums.py b/contentctl/objects/enums.py index fa294302..4e9f1146 100644 --- a/contentctl/objects/enums.py +++ b/contentctl/objects/enums.py @@ -54,7 +54,6 @@ class SecurityContentType(enum.Enum): deployments = 7 investigations = 8 unit_tests = 9 - ssa_detections = 10 data_sources = 11 # Bringing these changes back in line will take some time after diff --git a/contentctl/objects/ssa_detection.py b/contentctl/objects/ssa_detection.py deleted file mode 100644 index 036f0b77..00000000 --- a/contentctl/objects/ssa_detection.py +++ /dev/null @@ -1,156 +0,0 @@ -from __future__ import annotations -import uuid -import string -import requests -import time -from pydantic import BaseModel, validator, root_validator -from dataclasses import dataclass -from datetime import datetime -from typing import Union -import re - -from contentctl.objects.abstract_security_content_objects.detection_abstract import Detection_Abstract -from contentctl.objects.enums import AnalyticsType -from contentctl.objects.enums import DataModel -from contentctl.objects.enums import DetectionStatus -from contentctl.objects.deployment import Deployment -from contentctl.objects.ssa_detection_tags import SSADetectionTags -from contentctl.objects.unit_test_ssa import UnitTestSSA -from contentctl.objects.unit_test_old import UnitTestOld -from contentctl.objects.macro import Macro -from contentctl.objects.lookup import Lookup -from contentctl.objects.baseline import Baseline -from contentctl.objects.playbook import Playbook -from contentctl.helper.link_validator import LinkValidator -from contentctl.objects.enums import SecurityContentType - -class SSADetection(BaseModel): - # detection spec - name: str - id: str - version: int - date: str - author: str - type: AnalyticsType = ... - status: DetectionStatus = ... - detection_type: str = None - description: str - data_source: list[str] - search: Union[str, dict] - how_to_implement: str - known_false_positives: str - references: list - tags: SSADetectionTags - tests: list[UnitTestSSA] = None - - # enrichments - annotations: dict = None - risk: list = None - mappings: dict = None - file_path: str = None - source: str = None - test: Union[UnitTestSSA, dict, UnitTestOld] = None - runtime: str = None - internalVersion: int = None - - # @validator('name')v - # def name_max_length(cls, v, values): - # if len(v) > 67: - # raise ValueError('name is longer then 67 chars: ' + v) - # return v - - class Config: - use_enum_values = True - - ''' - @validator("name") - def name_invalid_chars(cls, v): - invalidChars = set(string.punctuation.replace("-", "")) - if any(char in invalidChars for char in v): - raise ValueError("invalid chars used in name: " + v) - return v - - @validator("id") - def id_check(cls, v, values): - try: - uuid.UUID(str(v)) - except: - raise ValueError("uuid is not valid: " + values["name"]) - return v - - @validator("date") - def date_valid(cls, v, values): - try: - datetime.strptime(v, "%Y-%m-%d") - except: - raise ValueError("date is not in format YYYY-MM-DD: " + values["name"]) - return v - - # @validator("type") - # def type_valid(cls, v, values): - # if v.lower() not in [el.name.lower() for el in AnalyticsType]: - # raise ValueError("not valid analytics type: " + values["name"]) - # return v - - @validator("description", "how_to_implement") - def encode_error(cls, v, values, field): - try: - v.encode("ascii") - except UnicodeEncodeError: - raise ValueError("encoding error in " + field.name + ": " + values["name"]) - return v - - # @root_validator - # def search_validation(cls, values): - # if 'ssa_' not in values['file_path']: - # if not '_filter' in values['search']: - # raise ValueError('filter macro missing in: ' + values["name"]) - # if any(x in values['search'] for x in ['eventtype=', 'sourcetype=', ' source=', 'index=']): - # if not 'index=_internal' in values['search']: - # raise ValueError('Use source macro instead of eventtype, sourcetype, source or index in detection: ' + values["name"]) - # return values - - @root_validator - def name_max_length(cls, values): - # Check max length only for ESCU searches, SSA does not have that constraint - if "ssa_" not in values["file_path"]: - if len(values["name"]) > 67: - raise ValueError("name is longer then 67 chars: " + values["name"]) - return values - - - @root_validator - def new_line_check(cls, values): - # Check if there is a new line in description and how to implement that is not escaped - pattern = r'(? 'CIS 20'): {values['name']}") - return v - - @validator('nist') - def tags_nist(cls, v, values): - # Sourced Courtest of NIST: https://www.nist.gov/system/files/documents/cyberframework/cybersecurity-framework-021214.pdf (Page 19) - IDENTIFY = [f'ID.{category}' for category in ["AM", "BE", "GV", "RA", "RM"] ] - PROTECT = [f'PR.{category}' for category in ["AC", "AT", "DS", "IP", "MA", "PT"]] - DETECT = [f'DE.{category}' for category in ["AE", "CM", "DP"] ] - RESPOND = [f'RS.{category}' for category in ["RP", "CO", "AN", "MI", "IM"] ] - RECOVER = [f'RC.{category}' for category in ["RP", "IM", "CO"] ] - ALL_NIST_CATEGORIES = IDENTIFY + PROTECT + DETECT + RESPOND + RECOVER - - - for value in v: - if not value in ALL_NIST_CATEGORIES: - raise ValueError(f"NIST Category '{value}' is not a valid category") - return v - - @validator('confidence') - def tags_confidence(cls, v, values): - v = int(v) - if not (v > 0 and v <= 100): - raise ValueError('confidence score is out of range 1-100.' ) - else: - return v - - - @validator('impact') - def tags_impact(cls, v, values): - if not (v > 0 and v <= 100): - raise ValueError('impact score is out of range 1-100.') - else: - return v - - @validator('kill_chain_phases') - def tags_kill_chain_phases(cls, v, values): - valid_kill_chain_phases = SES_KILL_CHAIN_MAPPINGS.keys() - for value in v: - if value not in valid_kill_chain_phases: - raise ValueError('kill chain phase not valid. Valid options are ' + str(valid_kill_chain_phases)) - return v - - @validator('mitre_attack_id') - def tags_mitre_attack_id(cls, v, values): - pattern = 'T[0-9]{4}' - for value in v: - if not re.match(pattern, value): - raise ValueError('Mitre Attack ID are not following the pattern Txxxx:' ) - return v - - - - @validator('risk_score') - def tags_calculate_risk_score(cls, v, values): - calculated_risk_score = round(values['impact'] * values['confidence'] / 100) - if calculated_risk_score != int(v): - raise ValueError(f"Risk Score must be calculated as round(confidence * impact / 100)" - f"\n Expected risk_score={calculated_risk_score}, found risk_score={int(v)}: {values['name']}") - return v - - - @model_validator(mode="after") - def tags_observable(self): - valid_roles = SES_OBSERVABLE_ROLE_MAPPING.keys() - valid_types = SES_OBSERVABLE_TYPE_MAPPING.keys() - - for value in self.observable: - if value['type'] in valid_types: - if 'Splunk Behavioral Analytics' in self.product: - continue - - if 'role' not in value: - raise ValueError('Observable role is missing') - for role in value['role']: - if role not in valid_roles: - raise ValueError(f'Observable role ' + role + ' not valid. Valid options are {str(valid_roles)}') - else: - raise ValueError(f'Observable type ' + value['type'] + ' not valid. Valid options are {str(valid_types)}') - return self \ No newline at end of file diff --git a/contentctl/output/new_content_yml_output.py b/contentctl/output/new_content_yml_output.py index df55dd1c..38730b37 100644 --- a/contentctl/output/new_content_yml_output.py +++ b/contentctl/output/new_content_yml_output.py @@ -39,11 +39,8 @@ def convertNameToFileName(self, name: str, product: list): .replace('.','_') \ .replace('/','_') \ .lower() - if 'Splunk Behavioral Analytics' in product: - - file_name = 'ssa___' + file_name + '.yml' - else: - file_name = file_name + '.yml' + + file_name = file_name + '.yml' return file_name @@ -54,8 +51,6 @@ def convertNameToTestFileName(self, name: str, product: list): .replace('.','_') \ .replace('/','_') \ .lower() - if 'Splunk Behavioral Analytics' in product: - file_name = 'ssa___' + file_name + '.test.yml' - else: - file_name = file_name + '.test.yml' + + file_name = file_name + '.test.yml' return file_name \ No newline at end of file From 473fd9dd2deabab72071811a9c71a337a1bdb6e7 Mon Sep 17 00:00:00 2001 From: pyth0n1c <87383215+pyth0n1c@users.noreply.github.com> Date: Fri, 6 Sep 2024 10:35:17 -0700 Subject: [PATCH 2/8] fix some whitespace issues when formatting a string field for a conf file --- contentctl/output/conf_writer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/contentctl/output/conf_writer.py b/contentctl/output/conf_writer.py index 2c8e82f7..b103a291 100644 --- a/contentctl/output/conf_writer.py +++ b/contentctl/output/conf_writer.py @@ -34,7 +34,10 @@ def escapeNewlines(obj:Any): # Failing to do so will result in an improperly formatted conf files that # cannot be parsed if isinstance(obj,str): - return obj.replace(f"\n"," \\\n") + # Remove leading and trailing characters. Conf parsers may erroneously + # Parse fields if they have leading or trailing newlines/whitespace and we + # probably don't want that anyway as it doesn't look good in output + return obj.strip().replace(f"\n"," \\\n") else: return obj From 148c12794b62f3d811b73e9106b53bc390670d2a Mon Sep 17 00:00:00 2001 From: pyth0n1c <87383215+pyth0n1c@users.noreply.github.com> Date: Fri, 6 Sep 2024 10:45:05 -0700 Subject: [PATCH 3/8] Update comment/docstring on function --- .../detection_abstract.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 075fb7a2..34904a9f 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -74,6 +74,8 @@ class Detection_Abstract(SecurityContentObject): data_source_objects: list[DataSource] = [] + + @field_validator("search", mode="before") @classmethod def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str: @@ -83,15 +85,13 @@ def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str: Args: - value (Union[str, dict[str,Any]]): The search. It can either be a string (and should be - SPL or a dict, in which case it is Sigma-formatted. + value (str): The SPL search. It must be an SPL-formatted string. info (ValidationInfo): The validation info can contain a number of different objects. Today it only contains the director. Returns: - Union[str, dict[str,Any]]: The search, either in sigma or SPL format. - """ - + str: The search, as an SPL formatted string. + """ # Otherwise, the search is SPL. From 225840012deaee4576027e78c8100729577ae673 Mon Sep 17 00:00:00 2001 From: ljstella Date: Thu, 12 Sep 2024 11:59:11 -0500 Subject: [PATCH 4/8] Remove unit_test_ssa and unit_test_old --- contentctl/objects/unit_test_old.py | 10 ---------- contentctl/objects/unit_test_ssa.py | 31 ----------------------------- 2 files changed, 41 deletions(-) delete mode 100644 contentctl/objects/unit_test_old.py delete mode 100644 contentctl/objects/unit_test_ssa.py diff --git a/contentctl/objects/unit_test_old.py b/contentctl/objects/unit_test_old.py deleted file mode 100644 index 3858e01a..00000000 --- a/contentctl/objects/unit_test_old.py +++ /dev/null @@ -1,10 +0,0 @@ -from __future__ import annotations -from pydantic import BaseModel - - -from contentctl.objects.unit_test_ssa import UnitTestSSA - - -class UnitTestOld(BaseModel): - name: str - tests: list[UnitTestSSA] \ No newline at end of file diff --git a/contentctl/objects/unit_test_ssa.py b/contentctl/objects/unit_test_ssa.py deleted file mode 100644 index 150b9efe..00000000 --- a/contentctl/objects/unit_test_ssa.py +++ /dev/null @@ -1,31 +0,0 @@ -from __future__ import annotations -from typing import Optional -from pydantic import BaseModel, Field -from pydantic import Field - - -class UnitTestAttackDataSSA(BaseModel): - file_name:Optional[str] = None - data: str = Field(...) - # TODO - should source and sourcetype should be mapped to a list - # of supported source and sourcetypes in a given environment? - source: str = Field(...) - - sourcetype: Optional[str] = None - - -class UnitTestSSA(BaseModel): - """ - A unit test for a detection - """ - name: str - - # The attack data to be ingested for the unit test - attack_data: list[UnitTestAttackDataSSA] = Field(...) - - - - - - - From 2170881c2d5449cd6f8f0d2a14ff2fac33492077 Mon Sep 17 00:00:00 2001 From: ljstella Date: Thu, 12 Sep 2024 12:07:11 -0500 Subject: [PATCH 5/8] Removal from APP enum --- contentctl/objects/enums.py | 1 - 1 file changed, 1 deletion(-) diff --git a/contentctl/objects/enums.py b/contentctl/objects/enums.py index 4e9f1146..240ba905 100644 --- a/contentctl/objects/enums.py +++ b/contentctl/objects/enums.py @@ -68,7 +68,6 @@ class SecurityContentType(enum.Enum): class SecurityContentProduct(enum.Enum): SPLUNK_APP = 1 - SSA = 2 API = 3 CUSTOM = 4 From 2a2eeae84451ece3be27fb033cdb994956bc0ee2 Mon Sep 17 00:00:00 2001 From: pyth0n1c <87383215+pyth0n1c@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:54:39 -0700 Subject: [PATCH 6/8] Remove extra line in python class --- .../abstract_security_content_objects/detection_abstract.py | 1 - 1 file changed, 1 deletion(-) diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 34904a9f..6e3a990e 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -75,7 +75,6 @@ class Detection_Abstract(SecurityContentObject): data_source_objects: list[DataSource] = [] - @field_validator("search", mode="before") @classmethod def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str: From b60eeb4b2971023fc06dedecc650c97a3bc928f2 Mon Sep 17 00:00:00 2001 From: pyth0n1c <87383215+pyth0n1c@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:55:53 -0700 Subject: [PATCH 7/8] remove extra line --- .../abstract_security_content_objects/detection_abstract.py | 1 - 1 file changed, 1 deletion(-) diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 6e3a990e..02d2756f 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -74,7 +74,6 @@ class Detection_Abstract(SecurityContentObject): data_source_objects: list[DataSource] = [] - @field_validator("search", mode="before") @classmethod def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str: From f348bc9e1941c903ed0e7582c492dbc6351f93a9 Mon Sep 17 00:00:00 2001 From: ljstella Date: Fri, 13 Sep 2024 16:45:31 -0500 Subject: [PATCH 8/8] Missed one --- contentctl/objects/config.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index 93c03660..d07e2459 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -233,9 +233,6 @@ def getPackageFilePath(self, include_version:bool=False)->pathlib.Path: return self.getBuildDir() / f"{self.app.appid}-{self.app.version}.tar.gz" else: return self.getBuildDir() / f"{self.app.appid}-latest.tar.gz" - - def getSSAPath(self)->pathlib.Path: - return self.getBuildDir() / "ssa" def getAPIPath(self)->pathlib.Path: return self.getBuildDir() / "api"