Skip to content

Commit

Permalink
added logic for checking when to run cms testing
Browse files Browse the repository at this point in the history
  • Loading branch information
cmcginley-splunk committed Oct 8, 2024
1 parent 336cd7b commit 2c87187
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 42 deletions.
38 changes: 29 additions & 9 deletions contentctl/actions/detection_testing/DetectionTestingManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,27 @@ def sigint_handler(signum, frame):
print("*******************************")

signal.signal(signal.SIGINT, sigint_handler)

with concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.input_dto.config.test_instances),
) as instance_pool, concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.input_dto.views)
) as view_runner, concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.input_dto.config.test_instances),
) as view_shutdowner:
# Capture any errors for reporting at the end after all threads have been gathered
errors: dict[str, list[Exception]] = {
"INSTANCE SETUP ERRORS": [],
"TESTING ERRORS": [],
"ERRORS DURING VIEW SHUTDOWN": [],
"ERRORS DURING VIEW EXECUTION": [],
}

# Start all the views
future_views = {
view_runner.submit(view.setup): view for view in self.input_dto.views
}

# Configure all the instances
future_instances_setup = {
instance_pool.submit(instance.setup): instance
Expand All @@ -87,10 +95,10 @@ def sigint_handler(signum, frame):
# Wait for all instances to be set up
for future in concurrent.futures.as_completed(future_instances_setup):
try:
result = future.result()
_ = future.result()
except Exception as e:
self.output_dto.terminate = True
print(f"Error setting up instance: {str(e)}")
errors["INSTANCE SETUP ERRORS"].append(e)

# Start and wait for all tests to run
if not self.output_dto.terminate:
Expand All @@ -102,10 +110,10 @@ def sigint_handler(signum, frame):
# Wait for execution to finish
for future in concurrent.futures.as_completed(future_instances_execute):
try:
result = future.result()
_ = future.result()
except Exception as e:
self.output_dto.terminate = True
print(f"Error running in container: {str(e)}")
errors["TESTING ERRORS"].append(e)

self.output_dto.terminate = True

Expand All @@ -115,16 +123,28 @@ def sigint_handler(signum, frame):
}
for future in concurrent.futures.as_completed(future_views_shutdowner):
try:
result = future.result()
_ = future.result()
except Exception as e:
print(f"Error stopping view: {str(e)}")
errors["ERRORS DURING VIEW SHUTDOWN"].append(e)

# Wait for original view-related threads to complete
for future in concurrent.futures.as_completed(future_views):
try:
result = future.result()
_ = future.result()
except Exception as e:
print(f"Error running container: {str(e)}")
errors["ERRORS DURING VIEW EXECUTION"].append(e)

# Log any errors
for error_type in errors:
if len(errors[error_type]) > 0:
print()
print(f"[{error_type}]:")
for error in errors[error_type]:
print(f"\t{str(error)}")
if isinstance(error, ExceptionGroup):
for suberror in error.exceptions: # type: ignore
print(f"\t\t{str(suberror)}") # type: ignore
print()

return self.output_dto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
from splunklib.results import JSONResultsReader, Message # type: ignore
from urllib3 import disable_warnings
import urllib.parse
from semantic_version import Version # type: ignore

from contentctl.objects.config import test_common, Infrastructure
from contentctl.objects.config import test_common, Infrastructure, All
from contentctl.objects.enums import PostTestBehavior, AnalyticsType
from contentctl.objects.detection import Detection
from contentctl.objects.base_test import BaseTest
Expand All @@ -42,6 +43,9 @@
TestingStates
)

# The app name of ES; needed to check ES version
ES_APP_NAME = "SplunkEnterpriseSecuritySuite"


class SetupTestGroupResults(BaseModel):
exception: Union[Exception, None] = None
Expand Down Expand Up @@ -127,17 +131,27 @@ def setup(self):
)

self.start_time = time.time()

# Init the list of setup functions we always need
setup_functions: list[tuple[Callable[[], None | client.Service], str]] = [
(self.start, "Starting"),
(self.get_conn, "Waiting for App Installation"),
(self.configure_conf_file_datamodels, "Configuring Datamodels"),
(self.create_replay_index, f"Create index '{self.sync_obj.replay_index}'"),
(self.check_for_es_install, "Checking for ES Install"),
(self.configure_imported_roles, "Configuring Roles"),
(self.configure_delete_indexes, "Configuring Indexes"),
(self.configure_hec, "Configuring HEC"),
]
setup_functions = setup_functions + self.content_versioning_service.setup_functions

# Add any setup functions only applicable to content versioning validation
if self.should_test_content_versioning:
setup_functions = setup_functions + self.content_versioning_service.setup_functions

# Add the final setup function
setup_functions.append((self.wait_for_ui_ready, "Finishing Setup"))

# Execute and report on each setup function
try:
for func, msg in setup_functions:
self.format_pbar_string(
Expand All @@ -150,9 +164,11 @@ def setup(self):
self.check_for_teardown()

except Exception as e:
self.pbar.write(str(e))
msg = f"[{self.get_name()}]: {str(e)}"
self.finish()
raise
if isinstance(e, ExceptionGroup):
raise ExceptionGroup(msg, e.exceptions) from e # type: ignore
raise Exception(msg) from e

self.format_pbar_string(TestReportingType.SETUP, self.get_name(), "Finished Setup!")

Expand All @@ -162,13 +178,72 @@ def wait_for_ui_ready(self):
@computed_field
@property
def content_versioning_service(self) -> ContentVersioningService:
"""
A computed field returning a handle to the content versioning service, used by ES to
version detections. We use this model to validate that all detections have been installed
compatibly with ES versioning.
:return: a handle to the content versioning service on the instance
:rtype: :class:`contentctl.objects.content_versioning_service.ContentVersioningService`
"""
return ContentVersioningService(
global_config=self.global_config,
infrastructure=self.infrastructure,
service=self.get_conn(),
detections=self.sync_obj.inputQueue
)

@property
def should_test_content_versioning(self) -> bool:
"""
Indicates whether we should test content versioning. Content versioning
should be tested when integration testing is enabled, the mode is all, and ES is at least
version 8.0.0.
:return: a bool indicating whether we should test content versioning
:rtype: bool
"""
es_version = self.es_version
return (
self.global_config.enable_integration_testing
and isinstance(self.global_config.mode, All)
and es_version is not None
and es_version >= Version("8.0.0")
)

@property
def es_version(self) -> Version | None:
"""
Returns the version of Enterprise Security installed on the instance; None if not installed.
:return: the version of ES, as a semver aware object
:rtype: :class:`semantic_version.Version`
"""
if not self.es_installed:
return None
return Version(self.get_conn().apps[ES_APP_NAME]["version"]) # type: ignore

@property
def es_installed(self) -> bool:
"""
Indicates whether ES is installed on the instance.
:return: a bool indicating whether ES is installed or not
:rtype: bool
"""
return ES_APP_NAME in self.get_conn().apps

def check_for_es_install(self) -> None:
"""
Validating function which raises an error if Enterprise Security is not installed and
integration testing is enabled.
"""
if not self.es_installed and self.global_config.enable_integration_testing:
raise Exception(
"Enterprise Security does not appear to be installed on this instance and "
"integration testing is enabled."
)

def configure_hec(self):
self.hec_channel = str(uuid.uuid4())
try:
Expand Down Expand Up @@ -282,25 +357,22 @@ def configure_imported_roles(
):
indexes.append(self.sync_obj.replay_index)
indexes_encoded = ";".join(indexes)

# Include ES roles if installed
if self.es_installed:
imported_roles = imported_roles + enterprise_security_roles
try:
self.get_conn().roles.post(
self.infrastructure.splunk_app_username,
imported_roles=imported_roles + enterprise_security_roles,
imported_roles=imported_roles,
srchIndexesAllowed=indexes_encoded,
srchIndexesDefault=self.sync_obj.replay_index,
)
return
except Exception as e:
self.pbar.write(
f"Enterprise Security Roles do not exist:'{enterprise_security_roles}: {str(e)}"
)

self.get_conn().roles.post(
self.infrastructure.splunk_app_username,
imported_roles=imported_roles,
srchIndexesAllowed=indexes_encoded,
srchIndexesDefault=self.sync_obj.replay_index,
)
msg = f"Error configuring roles: {str(e)}"
self.pbar.write(msg)
raise Exception(msg) from e

def configure_delete_indexes(self, indexes: list[str] = ["_*", "*"]):
indexes.append(self.sync_obj.replay_index)
Expand Down
Loading

0 comments on commit 2c87187

Please sign in to comment.