From 2c87187a40647a670df727404049554d6ca0e4e0 Mon Sep 17 00:00:00 2001 From: Casey McGinley Date: Tue, 8 Oct 2024 00:13:30 -0700 Subject: [PATCH] added logic for checking when to run cms testing --- .../DetectionTestingManager.py | 38 +++++-- .../DetectionTestingInfrastructure.py | 102 +++++++++++++++--- .../objects/content_versioning_service.py | 54 ++++++---- 3 files changed, 152 insertions(+), 42 deletions(-) diff --git a/contentctl/actions/detection_testing/DetectionTestingManager.py b/contentctl/actions/detection_testing/DetectionTestingManager.py index 4f04b202..a1d659ca 100644 --- a/contentctl/actions/detection_testing/DetectionTestingManager.py +++ b/contentctl/actions/detection_testing/DetectionTestingManager.py @@ -65,7 +65,7 @@ def sigint_handler(signum, frame): print("*******************************") signal.signal(signal.SIGINT, sigint_handler) - + with concurrent.futures.ThreadPoolExecutor( max_workers=len(self.input_dto.config.test_instances), ) as instance_pool, concurrent.futures.ThreadPoolExecutor( @@ -73,11 +73,19 @@ def sigint_handler(signum, frame): ) as view_runner, concurrent.futures.ThreadPoolExecutor( max_workers=len(self.input_dto.config.test_instances), ) as view_shutdowner: + # Capture any errors for reporting at the end after all threads have been gathered + errors: dict[str, list[Exception]] = { + "INSTANCE SETUP ERRORS": [], + "TESTING ERRORS": [], + "ERRORS DURING VIEW SHUTDOWN": [], + "ERRORS DURING VIEW EXECUTION": [], + } # Start all the views future_views = { view_runner.submit(view.setup): view for view in self.input_dto.views } + # Configure all the instances future_instances_setup = { instance_pool.submit(instance.setup): instance @@ -87,10 +95,10 @@ def sigint_handler(signum, frame): # Wait for all instances to be set up for future in concurrent.futures.as_completed(future_instances_setup): try: - result = future.result() + _ = future.result() except Exception as e: self.output_dto.terminate = True - print(f"Error setting up instance: {str(e)}") + errors["INSTANCE SETUP ERRORS"].append(e) # Start and wait for all tests to run if not self.output_dto.terminate: @@ -102,10 +110,10 @@ def sigint_handler(signum, frame): # Wait for execution to finish for future in concurrent.futures.as_completed(future_instances_execute): try: - result = future.result() + _ = future.result() except Exception as e: self.output_dto.terminate = True - print(f"Error running in container: {str(e)}") + errors["TESTING ERRORS"].append(e) self.output_dto.terminate = True @@ -115,16 +123,28 @@ def sigint_handler(signum, frame): } for future in concurrent.futures.as_completed(future_views_shutdowner): try: - result = future.result() + _ = future.result() except Exception as e: - print(f"Error stopping view: {str(e)}") + errors["ERRORS DURING VIEW SHUTDOWN"].append(e) # Wait for original view-related threads to complete for future in concurrent.futures.as_completed(future_views): try: - result = future.result() + _ = future.result() except Exception as e: - print(f"Error running container: {str(e)}") + errors["ERRORS DURING VIEW EXECUTION"].append(e) + + # Log any errors + for error_type in errors: + if len(errors[error_type]) > 0: + print() + print(f"[{error_type}]:") + for error in errors[error_type]: + print(f"\t❌ {str(error)}") + if isinstance(error, ExceptionGroup): + for suberror in error.exceptions: # type: ignore + print(f"\t\t❌ {str(suberror)}") # type: ignore + print() return self.output_dto diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py index 2841b3a5..2757665a 100644 --- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py +++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py @@ -20,8 +20,9 @@ from splunklib.results import JSONResultsReader, Message # type: ignore from urllib3 import disable_warnings import urllib.parse +from semantic_version import Version # type: ignore -from contentctl.objects.config import test_common, Infrastructure +from contentctl.objects.config import test_common, Infrastructure, All from contentctl.objects.enums import PostTestBehavior, AnalyticsType from contentctl.objects.detection import Detection from contentctl.objects.base_test import BaseTest @@ -42,6 +43,9 @@ TestingStates ) +# The app name of ES; needed to check ES version +ES_APP_NAME = "SplunkEnterpriseSecuritySuite" + class SetupTestGroupResults(BaseModel): exception: Union[Exception, None] = None @@ -127,17 +131,27 @@ def setup(self): ) self.start_time = time.time() + + # Init the list of setup functions we always need setup_functions: list[tuple[Callable[[], None | client.Service], str]] = [ (self.start, "Starting"), (self.get_conn, "Waiting for App Installation"), (self.configure_conf_file_datamodels, "Configuring Datamodels"), (self.create_replay_index, f"Create index '{self.sync_obj.replay_index}'"), + (self.check_for_es_install, "Checking for ES Install"), (self.configure_imported_roles, "Configuring Roles"), (self.configure_delete_indexes, "Configuring Indexes"), (self.configure_hec, "Configuring HEC"), ] - setup_functions = setup_functions + self.content_versioning_service.setup_functions + + # Add any setup functions only applicable to content versioning validation + if self.should_test_content_versioning: + setup_functions = setup_functions + self.content_versioning_service.setup_functions + + # Add the final setup function setup_functions.append((self.wait_for_ui_ready, "Finishing Setup")) + + # Execute and report on each setup function try: for func, msg in setup_functions: self.format_pbar_string( @@ -150,9 +164,11 @@ def setup(self): self.check_for_teardown() except Exception as e: - self.pbar.write(str(e)) + msg = f"[{self.get_name()}]: {str(e)}" self.finish() - raise + if isinstance(e, ExceptionGroup): + raise ExceptionGroup(msg, e.exceptions) from e # type: ignore + raise Exception(msg) from e self.format_pbar_string(TestReportingType.SETUP, self.get_name(), "Finished Setup!") @@ -162,6 +178,14 @@ def wait_for_ui_ready(self): @computed_field @property def content_versioning_service(self) -> ContentVersioningService: + """ + A computed field returning a handle to the content versioning service, used by ES to + version detections. We use this model to validate that all detections have been installed + compatibly with ES versioning. + + :return: a handle to the content versioning service on the instance + :rtype: :class:`contentctl.objects.content_versioning_service.ContentVersioningService` + """ return ContentVersioningService( global_config=self.global_config, infrastructure=self.infrastructure, @@ -169,6 +193,57 @@ def content_versioning_service(self) -> ContentVersioningService: detections=self.sync_obj.inputQueue ) + @property + def should_test_content_versioning(self) -> bool: + """ + Indicates whether we should test content versioning. Content versioning + should be tested when integration testing is enabled, the mode is all, and ES is at least + version 8.0.0. + + :return: a bool indicating whether we should test content versioning + :rtype: bool + """ + es_version = self.es_version + return ( + self.global_config.enable_integration_testing + and isinstance(self.global_config.mode, All) + and es_version is not None + and es_version >= Version("8.0.0") + ) + + @property + def es_version(self) -> Version | None: + """ + Returns the version of Enterprise Security installed on the instance; None if not installed. + + :return: the version of ES, as a semver aware object + :rtype: :class:`semantic_version.Version` + """ + if not self.es_installed: + return None + return Version(self.get_conn().apps[ES_APP_NAME]["version"]) # type: ignore + + @property + def es_installed(self) -> bool: + """ + Indicates whether ES is installed on the instance. + + :return: a bool indicating whether ES is installed or not + :rtype: bool + """ + return ES_APP_NAME in self.get_conn().apps + + def check_for_es_install(self) -> None: + """ + Validating function which raises an error if Enterprise Security is not installed and + integration testing is enabled. + """ + if not self.es_installed and self.global_config.enable_integration_testing: + raise Exception( + "Enterprise Security does not appear to be installed on this instance and " + "integration testing is enabled." + ) + def configure_hec(self): self.hec_channel = str(uuid.uuid4()) try: @@ -282,25 +357,22 @@ def configure_imported_roles( ): indexes.append(self.sync_obj.replay_index) indexes_encoded = ";".join(indexes) + + # Include ES roles if installed + if self.es_installed: + imported_roles = imported_roles + enterprise_security_roles try: self.get_conn().roles.post( self.infrastructure.splunk_app_username, - imported_roles=imported_roles + enterprise_security_roles, + imported_roles=imported_roles, srchIndexesAllowed=indexes_encoded, srchIndexesDefault=self.sync_obj.replay_index, ) return except Exception as e: - self.pbar.write( - f"Enterprise Security Roles do not exist:'{enterprise_security_roles}: {str(e)}" - ) - - self.get_conn().roles.post( - self.infrastructure.splunk_app_username, - imported_roles=imported_roles, - srchIndexesAllowed=indexes_encoded, - srchIndexesDefault=self.sync_obj.replay_index, - ) + msg = f"Error configuring roles: {str(e)}" + self.pbar.write(msg) + raise Exception(msg) from e def configure_delete_indexes(self, indexes: list[str] = ["_*", "*"]): indexes.append(self.sync_obj.replay_index) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index e752d081..d6203acc 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -17,21 +17,42 @@ # TODO (cmcginley): remove this logger logger = get_logger() +# TODO (cmcginley): would it be better for this to only run on one instance? Or to consolidate +# error reporting at least? + class ContentVersioningService(BaseModel): + """ + A model representing the content versioning service used in ES 8.0.0+. This model can be used + to validate that detections have been installed in a way that is compatible with content + versioning. + """ + + # The global contentctl config global_config: test_common + + # The instance specific infra config infrastructure: Infrastructure + + # The splunklib service service: splunklib.Service + + # The list of detections detections: list[Detection] + # The cached job on the splunk instance of the cms events _cms_main_job: splunklib.Job | None = PrivateAttr(default=None) class Config: + # We need to allow arbitrary type for the splunklib service arbitrary_types_allowed = True @computed_field @property def setup_functions(self) -> list[tuple[Callable[[], None], str]]: + """ + Returns the list of setup functions needed for content versioning testing + """ return [ (self.activate_versioning, "Activating Content Versioning"), (self.wait_for_cms_main, "Waiting for CMS Parser"), @@ -56,7 +77,7 @@ def _query_content_versioning_service(self, method: str, body: dict[str, Any] = # Query the content versioning service try: - response = self.service.request( # type: ignore + response = self.service.request( # type: ignore method=method, path_segment="configs/conf-feature_flags/general", body=body, @@ -119,8 +140,6 @@ def activate_versioning(self) -> None: """ Activate the content versioning service """ - # TODO (cmcginley): add conditional logging s.t. this check only happens when integration - # testing is enabled AND ES is at least version 8.0, AND mode is `all` # Post to the SA-ContentVersioning service to set versioning status self._query_content_versioning_service( method="POST", @@ -325,7 +344,7 @@ def validate_content_against_cms(self) -> None: # Report any errors extracting the detection name from the longer rule name if match is None: msg = ( - f"Entry in cms_main ('{cms_entry_name}') did not match the expected naming " + f"[{cms_entry_name}]: Entry in cms_main did not match the expected naming " "scheme; cannot compare to our detections." ) logger.error(msg) @@ -339,7 +358,7 @@ def validate_content_against_cms(self) -> None: # unexpected repeated entry if cms_entry_name in matched_detections: msg = ( - f"Detection '{cms_entry_name}' appears more than once in the cms_main " + f"[{cms_entry_name}]: Detection appears more than once in the cms_main " "index." ) logger.error(msg) @@ -376,7 +395,7 @@ def validate_content_against_cms(self) -> None: # Generate an exception if we couldn't match the CMS main entry to a detection if result_matches_detection is False: msg = ( - f"Could not match entry in cms_main for ('{cms_entry_name}') against any " + f"[{cms_entry_name}]: Could not match entry in cms_main against any " "of the expected detections." ) logger.error(msg) @@ -388,7 +407,7 @@ def validate_content_against_cms(self) -> None: # Generate exceptions for the unmatched detections for detection_name in remaining_detections: msg = ( - f"Detection '{detection_name}' not found in cms_main; there may be an " + f"[{detection_name}]: Detection not found in cms_main; there may be an " "issue with savedsearches.conf" ) logger.error(msg) @@ -430,8 +449,8 @@ def validate_detection_against_cms_event( # Compare the UUIDs if cms_uuid != detection.id: msg = ( - f"UUID in cms_event ('{cms_uuid}') does not match UUID in detection " - f"('{detection.id}'): {detection.name}" + f"[{detection.name}]: UUID in cms_event ('{cms_uuid}') does not match UUID in " + f"detection ('{detection.id}')" ) logger.error(msg) return Exception(msg) @@ -439,33 +458,32 @@ def validate_detection_against_cms_event( # Compare the versions (we append '-1' to the detection version to be in line w/ the # internal representation in ES) msg = ( - f"Version in cms_event ('{cms_event['version']}') does not match version in " - f"detection ('{detection.version}-1'): {detection.name}" + f"[{detection.name}]: Version in cms_event ('{cms_event['version']}') does not " + f"match version in detection ('{detection.version}-1')" ) logger.error(msg) return Exception(msg) elif cms_event[full_search_key] != rule_name_from_detection: # Compare the full search name msg = ( - f"Full search name in cms_event ('{cms_event[full_search_key]}') " - f"does not match detection name ('{detection.name}')" + f"[{detection.name}]: Full search name in cms_event " + f"('{cms_event[full_search_key]}') does not match detection name" ) logger.error(msg) return Exception(msg) elif cms_event["action.correlationsearch.label"] != f"{self.global_config.app.label} - {detection.name} - Rule": # Compare the correlation search label msg = ( - f"Correlation search label in cms_event " - f"('{cms_event['action.correlationsearch.label']}') does not match detection name " - f"('{detection.name}')" + f"[{detection.name}]: Correlation search label in cms_event " + f"('{cms_event['action.correlationsearch.label']}') does not match detection name" ) logger.error(msg) return Exception(msg) elif cms_event["sourcetype"] != f"{self.global_config.app.label} - {detection.name} - Rule": # Compare the full search name msg = ( - f"Sourcetype in cms_event ('{cms_event[f'sourcetype']}') does not match detection " - f"name ('{detection.name}')" + f"[{detection.name}]: Sourcetype in cms_event ('{cms_event[f'sourcetype']}') does " + f"not match detection name" ) logger.error(msg) return Exception(msg)