diff --git a/README.md b/README.md index 3d24c873..239eee12 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ This section is under active development. It will allow you to a [MITRE Map](ht Choose TYPE {detection, story} to create new content for the Content Pack. The tool will interactively ask a series of questions required for generating a basic piece of content and automatically add it to the Content Pack. ### contentctl inspect -This section is under development. It will enable the user to perform an appinspect of the content pack in preparation for deployment onto a Splunk Instance or via Splunk Cloud. +This section is under development. The inspect action performs a number of post-build validations. Primarily, it will enable the user to perform an appinspect of the content pack in preparation for deployment onto a Splunk Instance or via Splunk Cloud. It also compares detections in the new build against a prior build, confirming that any changed detections have had their versions incremented (this comparison happens at the savedsearch.conf level, which is why it must happen after the build). Please also note that new versions of contentctl may result in the generation of different savedsearches.conf files without any content changes in YML (new keys at the .conf level which will necessitate bumping of the version in the YML file). ### contentctl deploy The reason to build content is so that it can be deployed to your environment. However, deploying content to multiple servers and different types of infrastructure can be tricky and time-consuming. contentctl makes this easy by supporting a number of different deployment mechanisms. Deployment targets can be defined in [contentctl.yml](/contentctl/templates/contentctl_default.yml). diff --git a/contentctl/actions/inspect.py b/contentctl/actions/inspect.py index 9c46abae..38bc2b23 100644 --- a/contentctl/actions/inspect.py +++ b/contentctl/actions/inspect.py @@ -1,78 +1,86 @@ import sys - - from dataclasses import dataclass - import pathlib import json import datetime +import timeit +import time - -from contentctl.objects.config import inspect from requests import Session, post, get from requests.auth import HTTPBasicAuth -import timeit -import time + +from contentctl.objects.config import inspect +from contentctl.objects.savedsearches_conf import SavedsearchesConf +from contentctl.objects.errors import ( + MetadataValidationError, + DetectionIDError, + DetectionMissingError, + VersionDecrementedError, + VersionBumpingError +) + + @dataclass(frozen=True) class InspectInputDto: - config:inspect + config: inspect class Inspect: def execute(self, config: inspect) -> str: - if config.build_app or config.build_api: - + if config.build_app or config.build_api: + self.inspectAppCLI(config) appinspect_token = self.inspectAppAPI(config) - - + + if config.enable_metadata_validation: + self.check_detection_metadata(config) + else: + print("🟡 Detection metadata validation disabled, skipping.") + return appinspect_token else: - raise Exception("Inspect only supported for app and api build targets") - - def getElapsedTime(self, startTime:float)->datetime.timedelta: - return datetime.timedelta(seconds=round(timeit.default_timer() - startTime)) + raise Exception("Inspect only supported for app and api build targets") + def getElapsedTime(self, startTime: float) -> datetime.timedelta: + return datetime.timedelta(seconds=round(timeit.default_timer() - startTime)) - def inspectAppAPI(self, config: inspect)->str: + def inspectAppAPI(self, config: inspect) -> str: session = Session() session.auth = HTTPBasicAuth(config.splunk_api_username, config.splunk_api_password) if config.stack_type not in ['victoria', 'classic']: raise Exception(f"stack_type MUST be either 'classic' or 'victoria', NOT '{config.stack_type}'") - + APPINSPECT_API_LOGIN = "https://api.splunk.com/2.0/rest/login/splunk" - - - + res = session.get(APPINSPECT_API_LOGIN) - #If login failed or other failure, raise an exception + # If login failed or other failure, raise an exception res.raise_for_status() - - authorization_bearer = res.json().get("data",{}).get("token",None) + + authorization_bearer = res.json().get("data", {}).get("token", None) APPINSPECT_API_VALIDATION_REQUEST = "https://appinspect.splunk.com/v1/app/validate" headers = { "Authorization": f"bearer {authorization_bearer}", "Cache-Control": "no-cache" } - + package_path = config.getPackageFilePath(include_version=False) if not package_path.is_file(): raise Exception(f"Cannot run Appinspect API on App '{config.app.title}' - " f"no package exists as expected path '{package_path}'.\nAre you " "trying to 'contentctl deploy_acs' the package BEFORE running 'contentctl build'?") - + files = { - "app_package": open(package_path,"rb"), - "included_tags":(None,"cloud") - } - + "app_package": open(package_path, "rb"), + "included_tags": (None, "cloud") + } + res = post(APPINSPECT_API_VALIDATION_REQUEST, headers=headers, files=files) res.raise_for_status() - request_id = res.json().get("request_id",None) + request_id = res.json().get("request_id", None) APPINSPECT_API_VALIDATION_STATUS = f"https://appinspect.splunk.com/v1/app/validate/status/{request_id}?included_tags=private_{config.stack_type}" headers = headers = { "Authorization": f"bearer {authorization_bearer}" @@ -83,10 +91,10 @@ def inspectAppAPI(self, config: inspect)->str: # checking many times when we know it will take at least 40 seconds to run. iteration_wait_time = 40 while True: - + res = get(APPINSPECT_API_VALIDATION_STATUS, headers=headers) res.raise_for_status() - status = res.json().get("status",None) + status = res.json().get("status", None) if status in ["PROCESSING", "PREPARING"]: print(f"[{self.getElapsedTime(startTime)}] Appinspect API is {status}...") time.sleep(iteration_wait_time) @@ -97,12 +105,10 @@ def inspectAppAPI(self, config: inspect)->str: break else: raise Exception(f"Error - Unknown Appinspect API status '{status}'") - - - #We have finished running appinspect, so get the report + # We have finished running appinspect, so get the report APPINSPECT_API_REPORT = f"https://appinspect.splunk.com/v1/app/report/{request_id}?included_tags=private_{config.stack_type}" - #Get human-readable HTML report + # Get human-readable HTML report headers = headers = { "Authorization": f"bearer {authorization_bearer}", "Content-Type": "text/html" @@ -110,8 +116,8 @@ def inspectAppAPI(self, config: inspect)->str: res = get(APPINSPECT_API_REPORT, headers=headers) res.raise_for_status() report_html = res.content - - #Get JSON report for processing + + # Get JSON report for processing headers = headers = { "Authorization": f"bearer {authorization_bearer}", "Content-Type": "application/json" @@ -119,33 +125,31 @@ def inspectAppAPI(self, config: inspect)->str: res = get(APPINSPECT_API_REPORT, headers=headers) res.raise_for_status() report_json = res.json() - + # Just get app path here to avoid long function calls in the open() calls below appPath = config.getPackageFilePath(include_version=True) appinpect_html_path = appPath.with_suffix(appPath.suffix+".appinspect_api_results.html") appinspect_json_path = appPath.with_suffix(appPath.suffix+".appinspect_api_results.json") - #Use the full path of the app, but update the suffix to include info about appinspect + # Use the full path of the app, but update the suffix to include info about appinspect with open(appinpect_html_path, "wb") as report: report.write(report_html) with open(appinspect_json_path, "w") as report: json.dump(report_json, report) - - + self.parseAppinspectJsonLogFile(appinspect_json_path) - return authorization_bearer - - - def inspectAppCLI(self, config:inspect)-> None: - + + def inspectAppCLI(self, config: inspect) -> None: try: - raise Exception("Local spunk-appinspect Not Supported at this time (you may use the appinspect api). If you would like to locally inspect your app with" - "Python 3.7, 3.8, or 3.9 (with limited support), please refer to:\n" - "\t - https://dev.splunk.com/enterprise/docs/developapps/testvalidate/appinspect/useappinspectclitool/") + raise Exception( + "Local spunk-appinspect Not Supported at this time (you may use the appinspect api). If you would like to locally inspect your app with" + "Python 3.7, 3.8, or 3.9 (with limited support), please refer to:\n" + "\t - https://dev.splunk.com/enterprise/docs/developapps/testvalidate/appinspect/useappinspectclitool/" + ) from splunk_appinspect.main import ( - validate, MODE_OPTION, APP_PACKAGE_ARGUMENT, OUTPUT_FILE_OPTION, - LOG_FILE_OPTION, INCLUDED_TAGS_OPTION, EXCLUDED_TAGS_OPTION, + validate, MODE_OPTION, APP_PACKAGE_ARGUMENT, OUTPUT_FILE_OPTION, + LOG_FILE_OPTION, INCLUDED_TAGS_OPTION, EXCLUDED_TAGS_OPTION, PRECERT_MODE, TEST_MODE) except Exception as e: print(e) @@ -153,19 +157,19 @@ def inspectAppCLI(self, config:inspect)-> None: # if sys.version_info.major == 3 and sys.version_info.minor > 9: # print("The package splunk-appinspect was not installed due to a current issue with the library on Python3.10+. " # "Please use the following commands to set up a virtualenvironment in a different folder so you may run appinspect manually (if desired):" - # "\n\tpython3.9 -m venv .venv" + # "\n\tpython3.9 -m venv .venv" # "\n\tsource .venv/bin/activate" # "\n\tpython3 -m pip install splunk-appinspect" - # f"\n\tsplunk-appinspect inspect {self.getPackagePath(include_version=False).relative_to(pathlib.Path('.').absolute())} --mode precert") - + # f"\n\tsplunk-appinspect inspect {self.getPackagePath(include_version=False).relative_to(pathlib.Path('.').absolute())} --mode precert") + # else: # print("splunk-appinspect is only compatable with Python3.9 at this time. Please see the following open issue here: https://github.com/splunk/contentctl/issues/28") # print("******WARNING******") return # Note that all tags are available and described here: - # https://dev.splunk.com/enterprise/reference/appinspect/appinspecttagreference/ - # By default, precert mode will run ALL checks. Explicitly included or excluding tags will + # https://dev.splunk.com/enterprise/reference/appinspect/appinspecttagreference/ + # By default, precert mode will run ALL checks. Explicitly included or excluding tags will # change this behavior. To give the most thorough inspection, we leave these empty so that # ALL checks are run included_tags = [] @@ -179,82 +183,176 @@ def inspectAppCLI(self, config:inspect)-> None: options_list += [MODE_OPTION, TEST_MODE] options_list += [OUTPUT_FILE_OPTION, str(appinspect_output)] options_list += [LOG_FILE_OPTION, str(appinspect_logging)] - - #If there are any tags defined, then include them here + + # If there are any tags defined, then include them here for opt in included_tags: options_list += [INCLUDED_TAGS_OPTION, opt] for opt in excluded_tags: options_list += [EXCLUDED_TAGS_OPTION, opt] - cmdline = options_list + [arg[1] for arg in arguments_list] + cmdline = options_list + [arg[1] for arg in arguments_list] validate(cmdline) - + except SystemExit as e: if e.code == 0: # The sys.exit called inside of appinspect validate closes stdin. We need to # reopen it. - sys.stdin = open("/dev/stdin","r") + sys.stdin = open("/dev/stdin", "r") print(f"AppInspect passed! Please check [ {appinspect_output} , {appinspect_logging} ] for verbose information.") else: if sys.version.startswith('3.11') or sys.version.startswith('3.12'): - raise Exception("At this time, AppInspect may fail on valid apps under Python>=3.11 with " - "the error 'global flags not at the start of the expression at position 1'. " + raise Exception("At this time, AppInspect may fail on valid apps under Python>=3.11 with " + "the error 'global flags not at the start of the expression at position 1'. " "If you encounter this error, please run AppInspect on a version of Python " "<3.11. This issue is currently tracked. Please review the appinspect " "report output above for errors.") - else: - raise Exception("AppInspect Failure - Please review the appinspect report output above for errors.") + else: + raise Exception("AppInspect Failure - Please review the appinspect report output above for errors.") finally: - # appinspect outputs the log in json format, but does not format it to be easier - # to read (it is all in one line). Read back that file and write it so it - # is easier to understand - - #Note that this may raise an exception itself! - self.parseAppinspectJsonLogFile(appinspect_output) - - def parseAppinspectJsonLogFile(self, logfile_path:pathlib.Path, - status_types:list[str] = ["error", "failure", "manual_check", "warning"], - exception_types = ["error","failure","manual_check"] )->None: + # appinspect outputs the log in json format, but does not format it to be easier + # to read (it is all in one line). Read back that file and write it so it + # is easier to understand + + # Note that this may raise an exception itself! + self.parseAppinspectJsonLogFile(appinspect_output) + + def parseAppinspectJsonLogFile( + self, + logfile_path: pathlib.Path, + status_types: list[str] = ["error", "failure", "manual_check", "warning"], + exception_types: list[str] = ["error", "failure", "manual_check"] + ) -> None: if not set(exception_types).issubset(set(status_types)): - raise Exception(f"Error - exception_types {exception_types} MUST be a subset of status_types {status_types}, but it is not") + raise Exception(f"Error - exception_types {exception_types} MUST be a subset of status_types {status_types}, but it is not") with open(logfile_path, "r+") as logfile: j = json.load(logfile) - #Seek back to the beginning of the file. We don't need to clear - #it sice we will always write AT LEAST the same number of characters - #back as we read (due to the addition of whitespace) + # Seek back to the beginning of the file. We don't need to clear + # it sice we will always write AT LEAST the same number of characters + # back as we read (due to the addition of whitespace) logfile.seek(0) json.dump(j, logfile, indent=3, ) - + reports = j.get("reports", []) if len(reports) != 1: raise Exception("Expected to find one appinspect report but found 0") verbose_errors = [] - + for group in reports[0].get("groups", []): - for check in group.get("checks",[]): - if check.get("result","") in status_types: + for check in group.get("checks", []): + if check.get("result", "") in status_types: verbose_errors.append(f" - {check.get('result','')} [{group.get('name','NONAME')}: {check.get('name', 'NONAME')}]") verbose_errors.sort() - + summary = j.get("summary", None) if summary is None: raise Exception("Missing summary from appinspect report") msgs = [] generated_exception = False for key in status_types: - if summary.get(key,0)>0: + if summary.get(key, 0) > 0: msgs.append(f" - {summary.get(key,0)} {key}s") if key in exception_types: generated_exception = True - if len(msgs)>0 or len(verbose_errors): + if len(msgs) > 0 or len(verbose_errors): summary = '\n'.join(msgs) details = '\n'.join(verbose_errors) summary = f"{summary}\nDetails:\n{details}" if generated_exception: - raise Exception(f"AppInspect found [{','.join(exception_types)}] that MUST be addressed to pass AppInspect API:\n{summary}") + raise Exception(f"AppInspect found [{','.join(exception_types)}] that MUST be addressed to pass AppInspect API:\n{summary}") else: - print(f"AppInspect found [{','.join(status_types)}] that MAY cause a failure during AppInspect API:\n{summary}") + print(f"AppInspect found [{','.join(status_types)}] that MAY cause a failure during AppInspect API:\n{summary}") else: print("AppInspect was successful!") - + return + + def check_detection_metadata(self, config: inspect) -> None: + """ + Using a previous build, compare the savedsearches.conf files to detect any issues w/ + detection metadata. **NOTE**: Detection metadata validation can only be performed between + two builds with theappropriate metadata structure. In ESCU, this was added as of release + v4.39.0, so all current and previous builds for use with this feature must be this version + or greater. + + :param config: an inspect config + :type config: :class:`contentctl.objects.config.inspect` + """ + # TODO (#282): We should be inspect the same artifact we're passing around from the + # build stage ideally + # Unpack the savedsearch.conf of each app package + current_build_conf = SavedsearchesConf.init_from_package( + package_path=config.getPackageFilePath(include_version=False), + app_name=config.app.label, + appid=config.app.appid + ) + previous_build_conf = SavedsearchesConf.init_from_package( + package_path=config.get_previous_package_file_path(), + app_name=config.app.label, + appid=config.app.appid + ) + + # Compare the conf files + validation_errors: dict[str, list[MetadataValidationError]] = {} + for rule_name in previous_build_conf.detection_stanzas: + validation_errors[rule_name] = [] + # No detections should be removed from build to build + if rule_name not in current_build_conf.detection_stanzas: + validation_errors[rule_name].append(DetectionMissingError(rule_name=rule_name)) + continue + + # Pull out the individual stanza for readability + previous_stanza = previous_build_conf.detection_stanzas[rule_name] + current_stanza = current_build_conf.detection_stanzas[rule_name] + + # Detection IDs should not change + if current_stanza.metadata.detection_id != previous_stanza.metadata.detection_id: + validation_errors[rule_name].append( + DetectionIDError( + rule_name=rule_name, + current_id=current_stanza.metadata.detection_id, + previous_id=previous_stanza.metadata.detection_id + ) + ) + + # Versions should never decrement in successive builds + if current_stanza.metadata.detection_version < previous_stanza.metadata.detection_version: + validation_errors[rule_name].append( + VersionDecrementedError( + rule_name=rule_name, + current_version=current_stanza.metadata.detection_version, + previous_version=previous_stanza.metadata.detection_version + ) + ) + + # Versions need to be bumped if the stanza changes at all + if current_stanza.version_should_be_bumped(previous_stanza): + validation_errors[rule_name].append( + VersionBumpingError( + rule_name=rule_name, + current_version=current_stanza.metadata.detection_version, + previous_version=previous_stanza.metadata.detection_version + ) + ) + + # Convert our dict mapping to a flat list of errors for use in reporting + validation_error_list = [x for inner_list in validation_errors.values() for x in inner_list] + + # Report failure/success + print("\nDetection Metadata Validation:") + if len(validation_error_list) > 0: + # Iterate over each rule and report the failures + for rule_name in validation_errors: + if len(validation_errors[rule_name]) > 0: + print(f"\t❌ {rule_name}") + for error in validation_errors[rule_name]: + print(f"\t\t🔸 {error.short_message}") + else: + # If no errors in the list, report success + print("\t✅ Detection metadata looks good and all versions were bumped appropriately :)") + + # Raise an ExceptionGroup for all validation issues + if len(validation_error_list) > 0: + raise ExceptionGroup( + "Validation errors when comparing detection stanzas in current and previous build:", + validation_error_list + ) diff --git a/contentctl/helper/splunk_app.py b/contentctl/helper/splunk_app.py index 715e3072..0f95f593 100644 --- a/contentctl/helper/splunk_app.py +++ b/contentctl/helper/splunk_app.py @@ -1,20 +1,20 @@ -import os -import time import json +from typing import Optional, Collection +from pathlib import Path import xml.etree.ElementTree as ET -from typing import List, Tuple, Optional from urllib.parse import urlencode import requests import urllib3 import xmltodict from requests.adapters import HTTPAdapter -from requests.packages.urllib3.util.retry import Retry +from urllib3.util.retry import Retry urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) MAX_RETRY = 3 + class APIEndPoint: """ Class which contains Static Endpoint @@ -27,6 +27,7 @@ class APIEndPoint: SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}" SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}" + class RetryConstant: """ Class which contains Retry Constant @@ -53,11 +54,11 @@ class InitializationError(Exception): @staticmethod def requests_retry_session( - retries=RetryConstant.RETRY_COUNT, - backoff_factor=1, - status_forcelist=(500, 502, 503, 504), - session=None, - ): + retries: int = RetryConstant.RETRY_COUNT, + backoff_factor: int = 1, + status_forcelist: Collection[int] = (500, 502, 503, 504), + session: requests.Session | None = None, + ) -> requests.Session: session = session or requests.Session() retry = Retry( total=retries, @@ -260,4 +261,134 @@ def set_latest_version_info(self) -> None: # parse out the version number and fetch the download URL self.latest_version = info_url.split("/")[-1] - self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url) \ No newline at end of file + self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url) + + def __get_splunk_base_session_token(self, username: str, password: str) -> str: + """ + This method will generate Splunk base session token + + :param username: Splunkbase username + :type username: str + :param password: Splunkbase password + :type password: str + + :return: Splunk base session token + :rtype: str + """ + # Data payload for fetch splunk base session token + payload = urlencode( + { + "username": username, + "password": password, + } + ) + + headers = { + "content-type": "application/x-www-form-urlencoded", + "cache-control": "no-cache", + } + + response = requests.request( + "POST", + APIEndPoint.SPLUNK_BASE_AUTH_URL, + data=payload, + headers=headers, + ) + + token_value = "" + + if response.status_code != 200: + msg = ( + f"Error occurred while executing the rest call for splunk base authentication api," + f"{response.content}" + ) + raise Exception(msg) + else: + root = ET.fromstring(response.content) + token_value = root.find("{http://www.w3.org/2005/Atom}id").text.strip() + return token_value + + def download( + self, + out: Path, + username: str, + password: str, + is_dir: bool = False, + overwrite: bool = False + ) -> Path: + """ + Given an output path, download the app to the specified location + + :param out: the Path to download the app to + :type out: :class:`pathlib.Path` + :param username: Splunkbase username + :type username: str + :param password: Splunkbase password + :type password: str + :param is_dir: a flag indicating whether out is directory, otherwise a file (default: False) + :type is_dir: bool + :param overwrite: a flag indicating whether we can overwrite the file at out or not + :type overwrite: bool + + :returns path: the Path the download was written to (needed when is_dir is True) + :rtype: :class:`pathlib.Path` + """ + # Get the Splunkbase session token + token = self.__get_splunk_base_session_token(username, password) + response = requests.request( + "GET", + self.latest_version_download_url, + cookies={ + "sessionid": token + } + ) + + # If the provided output path was a directory we need to try and pull the filename from the + # response headers + if is_dir: + try: + # Pull 'Content-Disposition' from the headers + content_disposition: str = response.headers['Content-Disposition'] + + # Attempt to parse the filename as a KV + key, value = content_disposition.strip().split("=") + if key != "attachment;filename": + raise ValueError(f"Unexpected key in 'Content-Disposition' KV pair: {key}") + + # Validate the filename is the expected .tgz file + filename = Path(value.strip().strip('"')) + if filename.suffixes != [".tgz"]: + raise ValueError(f"Filename has unexpected extension(s): {filename.suffixes}") + out = Path(out, filename) + except KeyError as e: + raise KeyError( + f"Unable to properly extract 'Content-Disposition' from response headers: {e}" + ) from e + except ValueError as e: + raise ValueError( + f"Unable to parse filename from 'Content-Disposition' header: {e}" + ) from e + + # Ensure the output path is not already occupied + if out.exists() and not overwrite: + msg = ( + f"File already exists at {out}, cannot download the app." + ) + raise Exception(msg) + + # Make any parent directories as needed + out.parent.mkdir(parents=True, exist_ok=True) + + # Check for HTTP errors + if response.status_code != 200: + msg = ( + f"Error occurred while executing the rest call for splunk base authentication api," + f"{response.content}" + ) + raise Exception(msg) + + # Write the app to disk + with open(out, "wb") as file: + file.write(response.content) + + return out diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 02d2756f..bd4f83df 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -388,7 +388,11 @@ def metadata(self) -> dict[str, str|float]: # NOTE: we ignore the type error around self.status because we are using Pydantic's # use_enum_values configuration # https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name - + + # NOTE: The `inspect` action is HIGHLY sensitive to the structure of the metadata line in + # the detection stanza in savedsearches.conf. Additive operations (e.g. a new field in the + # dict below) should not have any impact, but renaming or removing any of these fields will + # break the `inspect` action. return { 'detection_id': str(self.id), 'deprecated': '1' if self.status == DetectionStatus.deprecated.value else '0', # type: ignore diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index d506362b..5a60b700 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -1,26 +1,31 @@ from __future__ import annotations + +from os import environ +from datetime import datetime, UTC +from typing import Optional, Any, List, Union, Self +import random +from enum import StrEnum, auto +import pathlib +from urllib.parse import urlparse +from abc import ABC, abstractmethod +from functools import partialmethod + +import tqdm +import semantic_version from pydantic import ( BaseModel, Field, field_validator, field_serializer, ConfigDict, DirectoryPath, PositiveInt, FilePath, HttpUrl, AnyUrl, model_validator, ValidationInfo ) + +from contentctl.objects.constants import DOWNLOADS_DIRECTORY from contentctl.output.yml_writer import YmlWriter -from os import environ -from datetime import datetime, UTC -from typing import Optional,Any,Annotated,List,Union, Self -import semantic_version -import random -from enum import StrEnum, auto -import pathlib from contentctl.helper.utils import Utils -from urllib.parse import urlparse -from abc import ABC, abstractmethod from contentctl.objects.enums import PostTestBehavior, DetectionTestingMode from contentctl.objects.detection import Detection from contentctl.objects.annotated_types import APPID_TYPE -import tqdm -from functools import partialmethod +from contentctl.helper.splunk_app import SplunkApp ENTERPRISE_SECURITY_UID = 263 COMMON_INFORMATION_MODEL_UID = 1621 @@ -289,11 +294,89 @@ class StackType(StrEnum): classic = auto() victoria = auto() + class inspect(build): - splunk_api_username: str = Field(description="Splunk API username used for running appinspect.") - splunk_api_password: str = Field(exclude=True, description="Splunk API password used for running appinspect.") + splunk_api_username: str = Field( + description="Splunk API username used for appinspect and Splunkbase downloads." + ) + splunk_api_password: str = Field( + exclude=True, + description="Splunk API password used for appinspect and Splunkbase downloads." + ) + enable_metadata_validation: bool = Field( + default=False, + description=( + "Flag indicating whether detection metadata validation and versioning enforcement " + "should be enabled." + ) + ) + enrichments: bool = Field( + default=True, + description=( + "[NOTE: enrichments must be ENABLED for inspect to run. Please adjust your config " + f"or CLI invocation appropriately] {validate.model_fields['enrichments'].description}" + ) + ) + # TODO (cmcginley): wording should change here if we want to be able to download any app from + # Splunkbase + previous_build: str | None = Field( + default=None, + description=( + "Local path to the previous app build for metatdata validation and versioning " + "enforcement (defaults to the latest release of the app published on Splunkbase)." + ) + ) stack_type: StackType = Field(description="The type of your Splunk Cloud Stack") + @field_validator("enrichments", mode="after") + @classmethod + def validate_needed_flags_metadata_validation(cls, v: bool, info: ValidationInfo) -> bool: + """ + Validates that `enrichments` is True for the inspect action + + :param v: the field's value + :type v: bool + :param info: the ValidationInfo to be used + :type info: :class:`pydantic.ValidationInfo` + + :returns: bool, for v + :rtype: bool + """ + # Enforce that `enrichments` is True for the inspect action + if v is False: + raise ValueError("Field `enrichments` must be True for the `inspect` action") + + return v + + def get_previous_package_file_path(self) -> pathlib.Path: + """ + Returns a Path object for the path to the prior package build. If no path was provided, the + latest version is downloaded from Splunkbase and it's filepath is returned, and saved to the + in-memory config (so download doesn't happen twice in the same run). + + :returns: Path object to previous app build + :rtype: :class:`pathlib.Path` + """ + previous_build_path = self.previous_build + # Download the previous build as the latest release on Splunkbase if no path was provided + if previous_build_path is None: + print( + f"Downloading latest {self.app.label} build from Splunkbase to serve as previous " + "build during validation..." + ) + app = SplunkApp(app_uid=self.app.uid) + previous_build_path = app.download( + out=pathlib.Path(DOWNLOADS_DIRECTORY), + username=self.splunk_api_username, + password=self.splunk_api_password, + is_dir=True, + overwrite=True + ) + print(f"Latest release downloaded from Splunkbase to: {previous_build_path}") + self.previous_build = str(previous_build_path) + return pathlib.Path(previous_build_path) + + class NewContentType(StrEnum): detection = auto() story = auto() diff --git a/contentctl/objects/constants.py b/contentctl/objects/constants.py index 5cafd62b..a65e317c 100644 --- a/contentctl/objects/constants.py +++ b/contentctl/objects/constants.py @@ -136,4 +136,7 @@ RBA_OBSERVABLE_ROLE_MAPPING = { "Attacker": 0, "Victim": 1 -} \ No newline at end of file +} + +# The relative path to the directory where any apps/packages will be downloaded +DOWNLOADS_DIRECTORY = "downloads" diff --git a/contentctl/objects/detection_metadata.py b/contentctl/objects/detection_metadata.py new file mode 100644 index 00000000..46f07e78 --- /dev/null +++ b/contentctl/objects/detection_metadata.py @@ -0,0 +1,71 @@ +import uuid +from typing import Any + +from pydantic import BaseModel, Field, field_validator + + +class DetectionMetadata(BaseModel): + """ + A model of the metadata line in a detection stanza in savedsearches.conf + """ + # A bool indicating whether the detection is deprecated (serialized as an int, 1 or 0) + deprecated: bool = Field(...) + + # A UUID identifying the detection + detection_id: uuid.UUID = Field(...) + + # The version of the detection + detection_version: int = Field(...) + + # The time the detection was published. **NOTE** This field was added to the metadata in ESCU + # as of v4.39.0 + publish_time: float = Field(...) + + class Config: + # Allowing for future fields that may be added to the metadata JSON + extra = "allow" + + @field_validator("deprecated", mode="before") + @classmethod + def validate_deprecated(cls, v: Any) -> Any: + """ + Convert str to int, and then ints to bools for deprecated; raise if not 0 or 1 in the case + of an int, or if str cannot be converted to int. + + :param v: the value passed + :type v: :class:`typing.Any` + + :returns: the value + :rtype: :class:`typing.Any` + """ + if isinstance(v, str): + try: + v = int(v) + except ValueError as e: + raise ValueError(f"Cannot convert str value ({v}) to int: {e}") from e + if isinstance(v, int): + if not (0 <= v <= 1): + raise ValueError( + f"Value for field 'deprecated' ({v}) must be 0 or 1, if not a bool." + ) + v = bool(v) + return v + + @field_validator("detection_version", mode="before") + @classmethod + def validate_detection_version(cls, v: Any) -> Any: + """ + Convert str to int; raise if str cannot be converted to int. + + :param v: the value passed + :type v: :class:`typing.Any` + + :returns: the value + :rtype: :class:`typing.Any` + """ + if isinstance(v, str): + try: + v = int(v) + except ValueError as e: + raise ValueError(f"Cannot convert str value ({v}) to int: {e}") from e + return v diff --git a/contentctl/objects/detection_stanza.py b/contentctl/objects/detection_stanza.py new file mode 100644 index 00000000..88f9c350 --- /dev/null +++ b/contentctl/objects/detection_stanza.py @@ -0,0 +1,79 @@ +from typing import ClassVar +import hashlib +from functools import cached_property + +from pydantic import BaseModel, Field, computed_field + +from contentctl.objects.detection_metadata import DetectionMetadata + + +class DetectionStanza(BaseModel): + """ + A model representing a stanza for a detection in savedsearches.conf + """ + # The lines that comprise this stanza, in the order they appear in the conf + lines: list[str] = Field(...) + + # The full name of the detection (e.g. "ESCU - My Detection - Rule") + name: str = Field(...) + + # The key prefix indicating the metadata attribute + METADATA_LINE_PREFIX: ClassVar[str] = "action.correlationsearch.metadata = " + + @computed_field + @cached_property + def metadata(self) -> DetectionMetadata: + """ + The metadata extracted from the stanza. Using the provided lines, parse out the metadata + + :returns: the detection stanza's metadata + :rtype: :class:`contentctl.objects.detection_metadata.DetectionMetadata` + """ + # Set a variable to store the metadata line in + meta_line: str | None = None + + # Iterate over the lines to look for the metadata line + for line in self.lines: + if line.startswith(DetectionStanza.METADATA_LINE_PREFIX): + # If we find a matching line more than once, we've hit an error + if meta_line is not None: + raise Exception( + f"Metadata for detection '{self.name}' found twice in stanza." + ) + meta_line = line + + # Report if we could not find the metadata line + if meta_line is None: + raise Exception(f"No metadata for detection '{self.name}' found in stanza.") + + # Parse the metadata JSON into a model + return DetectionMetadata.model_validate_json(meta_line[len(DetectionStanza.METADATA_LINE_PREFIX):]) + + @computed_field + @cached_property + def hash(self) -> str: + """ + The SHA256 hash of the lines of the stanza, excluding the metadata line + + :returns: hexdigest + :rtype: str + """ + hash = hashlib.sha256() + for line in self.lines: + if not line.startswith(DetectionStanza.METADATA_LINE_PREFIX): + hash.update(line.encode("utf-8")) + return hash.hexdigest() + + def version_should_be_bumped(self, previous: "DetectionStanza") -> bool: + """ + A helper method that compares this stanza against the same stanza from a previous build; + returns True if the version still needs to be bumped (e.g. the detection was changed but + the version was not), False otherwise. + + :param previous: the previous build's DetectionStanza for comparison + :type previous: :class:`contentctl.objects.detection_stanza.DetectionStanza` + + :returns: True if the version still needs to be bumped + :rtype: bool + """ + return (self.hash != previous.hash) and (self.metadata.detection_version <= previous.metadata.detection_version) diff --git a/contentctl/objects/errors.py b/contentctl/objects/errors.py index 3c4d630d..06f7751a 100644 --- a/contentctl/objects/errors.py +++ b/contentctl/objects/errors.py @@ -1,3 +1,7 @@ +from abc import ABC, abstractmethod +from uuid import UUID + + class ValidationFailed(Exception): """Indicates not an error in execution, but a validation failure""" pass @@ -16,3 +20,186 @@ class ServerError(IntegrationTestingError): class ClientError(IntegrationTestingError): """An error encounterd during integration testing, on the client's side (locally)""" pass + + +class MetadataValidationError(Exception, ABC): + """ + Base class for any errors arising from savedsearches.conf detection metadata validation + """ + # The name of the rule the error relates to + rule_name: str + + @property + @abstractmethod + def long_message(self) -> str: + """ + A long-form error message + :returns: a str, the message + """ + raise NotImplementedError() + + @property + @abstractmethod + def short_message(self) -> str: + """ + A short-form error message + :returns: a str, the message + """ + raise NotImplementedError() + + +class DetectionMissingError(MetadataValidationError): + """ + An error indicating a detection in the prior build could not be found in the current build + """ + def __init__( + self, + rule_name: str, + *args: object + ) -> None: + self.rule_name = rule_name + super().__init__(self.long_message, *args) + + @property + def long_message(self) -> str: + """ + A long-form error message + :returns: a str, the message + """ + return ( + f"Rule '{self.rule_name}' in previous build not found in current build; " + "detection may have been removed or renamed." + ) + + @property + def short_message(self) -> str: + """ + A short-form error message + :returns: a str, the message + """ + return ( + "Detection from previous build not found in current build." + ) + + +class DetectionIDError(MetadataValidationError): + """ + An error indicating the detection ID may have changed between builds + """ + # The ID from the current build + current_id: UUID + + # The ID from the previous build + previous_id: UUID + + def __init__( + self, + rule_name: str, + current_id: UUID, + previous_id: UUID, + *args: object + ) -> None: + self.rule_name = rule_name + self.current_id = current_id + self.previous_id = previous_id + super().__init__(self.long_message, *args) + + @property + def long_message(self) -> str: + """ + A long-form error message + :returns: a str, the message + """ + return ( + f"Rule '{self.rule_name}' has ID {self.current_id} in current build " + f"and {self.previous_id} in previous build; detection IDs and " + "names should not change for the same detection between releases." + ) + + @property + def short_message(self) -> str: + """ + A short-form error message + :returns: a str, the message + """ + return ( + f"Detection ID {self.current_id} in current build does not match ID {self.previous_id} in previous build." + ) + + +class VersioningError(MetadataValidationError, ABC): + """ + A base class for any metadata validation errors relating to detection versioning + """ + # The version in the current build + current_version: int + + # The version in the previous build + previous_version: int + + def __init__( + self, + rule_name: str, + current_version: int, + previous_version: int, + *args: object + ) -> None: + self.rule_name = rule_name + self.current_version = current_version + self.previous_version = previous_version + super().__init__(self.long_message, *args) + + +class VersionDecrementedError(VersioningError): + """ + An error indicating the version number went down between builds + """ + @property + def long_message(self) -> str: + """ + A long-form error message + :returns: a str, the message + """ + return ( + f"Rule '{self.rule_name}' has version {self.current_version} in " + f"current build and {self.previous_version} in previous build; " + "detection versions cannot decrease in successive builds." + ) + + @property + def short_message(self) -> str: + """ + A short-form error message + :returns: a str, the message + """ + return ( + f"Detection version ({self.current_version}) in current build is less than version " + f"({self.previous_version}) in previous build." + ) + + +class VersionBumpingError(VersioningError): + """ + An error indicating the detection changed but its version wasn't bumped appropriately + """ + @property + def long_message(self) -> str: + """ + A long-form error message + :returns: a str, the message + """ + return ( + f"Rule '{self.rule_name}' has changed in current build compared to previous " + "build (stanza hashes differ); the detection version should be bumped " + f"to at least {self.previous_version + 1}." + ) + + @property + def short_message(self) -> str: + """ + A short-form error message + :returns: a str, the message + """ + return ( + f"Detection version in current build should be bumped to at least {self.previous_version + 1}." + ) diff --git a/contentctl/objects/savedsearches_conf.py b/contentctl/objects/savedsearches_conf.py new file mode 100644 index 00000000..79e559c8 --- /dev/null +++ b/contentctl/objects/savedsearches_conf.py @@ -0,0 +1,196 @@ + +from pathlib import Path +from typing import Any, ClassVar +import re +import tempfile +import tarfile + +from pydantic import BaseModel, Field, PrivateAttr + +from contentctl.objects.detection_stanza import DetectionStanza + + +class SavedsearchesConf(BaseModel): + """ + A model of the savedsearches.conf file, represented as a set of stanzas + + NOTE: At present, this model only parses the detections themselves from the .conf; thing like + baselines or response tasks are left alone currently + """ + # The path to the conf file + path: Path = Field(...) + + # The app label (used for pattern matching in the conf) (e.g. ESCU) + app_label: str = Field(...) + + # A dictionary mapping rule names to a model of the corresponding stanza in the conf + detection_stanzas: dict[str, DetectionStanza] = Field(default={}, init=False) + + # A internal flag indicating whether we are currently in the detections portion of the conf + # during parsing + _in_detections: bool = PrivateAttr(default=False) + + # A internal flag indicating whether we are currently in a specific section of the conf + # during parsing + _in_section: bool = PrivateAttr(default=False) + + # A running list of the accumulated lines identified as part of the current section + _current_section_lines: list[str] = PrivateAttr(default=[]) + + # The name of the current section + _current_section_name: str | None = PrivateAttr(default=None) + + # The current line number as we continue to parse the file + _current_line_no: int = PrivateAttr(default=0) + + # A format string for the path to the savedsearches.conf in the app package + PACKAGE_CONF_PATH_FMT_STR: ClassVar[str] = "{appid}/default/savedsearches.conf" + + def model_post_init(self, __context: Any) -> None: + super().model_post_init(__context) + self._parse_detection_stanzas() + + def is_section_header(self, line: str) -> bool: + """ + Given a line, determine if the line is a section header, indicating the start of a new + section + + :param line: a line from the conf file + :type line: str + + :returns: a bool indicating whether the current line is a section header or not + :rtype: bool + """ + # Compile the pattern based on the app name + pattern = re.compile(r"\[" + self.app_label + r" - .+ - Rule\]") + if pattern.match(line): + return True + return False + + def section_start(self, line: str) -> None: + """ + Given a line, adjust the state to track a new section + + :param line: a line from the conf file + :type line: str + """ + # Determine the new section name: + new_section_name = line.strip().strip("[").strip("]") + + # Raise if we are in a section already according to the state (we cannot statr a new section + # before ending the previous section) + if self._in_section: + raise Exception( + "Attempting to start a new section w/o ending the current one; check for " + f"parsing/serialization errors: (current section: '{self._current_section_name}', " + f"new section: '{new_section_name}') [see line {self._current_line_no} in " + f"{self.path}]" + ) + + # Capture the name of this section, reset the lines, and indicate that we are now in a + # section + self._current_section_name = new_section_name + self._current_section_lines = [line] + self._in_section = True + + def section_end(self) -> None: + """ + Adjust the state end the section we were enumerating; parse the lines as a DetectionStanza + """ + # Name should have been set during section start + if self._current_section_name is None: + raise Exception( + "Name for the current section was never set; check for parsing/serialization " + f"errors [see line {self._current_line_no} in {self.path}]." + ) + elif self._current_section_name in self.detection_stanzas: + # Each stanza should be unique, so the name should not already be in the dict + raise Exception( + f"Name '{self._current_section_name}' already in set of stanzas [see line " + f"{self._current_line_no} in {self.path}]." + ) + + # Build the stanza model from the accumulated lines and adjust the state to end this section + self.detection_stanzas[self._current_section_name] = DetectionStanza( + name=self._current_section_name, + lines=self._current_section_lines + ) + self._in_section = False + + def _parse_detection_stanzas(self) -> None: + """ + Open the conf file, and parse out DetectionStanza objects from the raw conf stanzas + """ + # We don't want to parse the stanzas twice (non-atomic operation) + if len(self.detection_stanzas) != 0: + raise Exception( + f"{len(self.detection_stanzas)} stanzas have already been parsed from this conf; we" + " do not need to parse them again" + ) + + # Open the conf file and iterate over the lines + with open(self.path, "r") as file: + for line in file: + self._current_line_no += 1 + + # Break when we get to the end of the app detections + if line.strip() == f"### END {self.app_label} DETECTIONS ###": + break + elif self._in_detections: + # Check if we are in the detections portion of the conf, and then if we are in a + # section + if self._in_section: + # If we are w/in a section and have hit an empty line, close the section + if line.strip() == "": + self.section_end() + elif self.is_section_header(line): + # Raise if we encounter a section header w/in a section + raise Exception( + "Encountered section header while already in section (current " + f"section: '{self._current_section_name}') [see line " + f"{self._current_line_no} in {self.path}]." + ) + else: + # Otherwise, append the line + self._current_section_lines.append(line) + elif self.is_section_header(line): + # If we encounter a section header while not already in a section, start a + # new one + self.section_start(line) + elif line.strip() != "": + # If we are not in a section and have encountered anything other than an + # empty line, something is wrong + raise Exception( + "Found a non-empty line outside a stanza [see line " + f"{self._current_line_no} in {self.path}]." + ) + elif line.strip() == f"### {self.app_label} DETECTIONS ###": + # We have hit the detections portion of the conf and we adjust the state + # accordingly + self._in_detections = True + + @staticmethod + def init_from_package(package_path: Path, app_name: str, appid: str) -> "SavedsearchesConf": + """ + Alternate constructor which can take an app package, and extract the savedsearches.conf from + a temporary file. + + :param package_path: Path to the app package + :type package_path: :class:`pathlib.Path` + :param app_name: the name of the app (e.g. ESCU) + :type app_name: str + + :returns: a SavedsearchesConf object + :rtype: :class:`contentctl.objects.savedsearches_conf.SavedsearchesConf` + """ + # Create a temporary directory + with tempfile.TemporaryDirectory() as tmpdir: + # Open the tar/gzip archive + with tarfile.open(package_path) as package: + # Extract the savedsearches.conf and use it to init the model + package_conf_path = SavedsearchesConf.PACKAGE_CONF_PATH_FMT_STR.format(appid=appid) + package.extract(package_conf_path, path=tmpdir) + return SavedsearchesConf( + path=Path(tmpdir, package_conf_path), + app_label=app_name + ) diff --git a/pyproject.toml b/pyproject.toml index 9fd1c216..2b54dfd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ tqdm = "^4.66.5" pygit2 = "^1.15.1" tyro = "^0.8.3" gitpython = "^3.1.43" -setuptools = ">=69.5.1,<75.0.0" +setuptools = ">=69.5.1,<76.0.0" [tool.poetry.dev-dependencies] [build-system]