Skip to content

Commit

Permalink
More enhancements to catch
Browse files Browse the repository at this point in the history
when fields are missing after
running a search with real
data on a splunk instance.
This helps determine to
and even higher degree
if notables were declared
correctly and gives a high
degree of certainty that they
will be generated correctly
in ES.
  • Loading branch information
pyth0n1c committed Jul 29, 2023
1 parent db06257 commit 7b3b5da
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from contentctl.objects.test_config import TestConfig
from shutil import copyfile
from splunklib.binding import HTTPError
from splunklib.results import JSONResultsReader, Message
import os.path
import configparser
from ssl import SSLEOFError, SSLZeroReturnError
Expand All @@ -33,6 +34,7 @@
import tqdm



MAX_TEST_NAME_LENGTH = 70
TESTING_STATES = [
"Downloading Data",
Expand Down Expand Up @@ -407,7 +409,7 @@ def execute_test(

test.result = UnitTestResult()
test.result.set_job_content(
e, self.config, duration=time.time() - start_time
None, self.config, exception=e, duration=time.time() - start_time
)
self.pbar.write(
self.format_pbar_string(
Expand Down Expand Up @@ -439,7 +441,7 @@ def execute_test(
except Exception as e:
test.result = UnitTestResult()
test.result.set_job_content(
e, self.config, duration=time.time() - start_time
None, self.config, exception=e, duration=time.time() - start_time
)

if (
Expand Down Expand Up @@ -533,19 +535,73 @@ def retry_search_until_timeout(

job = self.get_conn().search(query=search, **kwargs)

# the following raises an error if there is an exception in the search
_ = job.results(output_mode="json")

results = JSONResultsReader(job.results(output_mode="json"))

observable_fields_set = set([o.name for o in detection.tags.observable])

if int(job.content.get("resultCount", "0")) > 0:
test.result = UnitTestResult()
empty_fields = set()
for result in results:
if isinstance(result, Message):
continue

#otherwise it is a dict and we will process is
results_fields_set = set(result.keys())

missing_fields = observable_fields_set - results_fields_set


if len(missing_fields) > 0:
e = Exception(f"The observable field(s) {missing_fields} are missing in the detection results")
test.result.set_job_content(
job.content,
self.config,
exception=e,
success=False,
duration=time.time() - search_start_time,
)


return




# If we find one or more fields that contain the string "null" then they were
# not populated and we should throw an error. This can happen if there is a typo
# on a field. In this case, the field will appear but will not contain any values
current_empty_fields = set()
for field in observable_fields_set:
if result.get(field,'null') == 'null':
current_empty_fields.add(field)


if len(current_empty_fields) == 0:
test.result.set_job_content(
job.content,
self.config,
success=True,
duration=time.time() - search_start_time,
)
return

else:
empty_fields = empty_fields.union(current_empty_fields)


e = Exception(f"One or more required observable fields {empty_fields} contained 'null' values. Is the data being "
"parsed correctly or is there an error in the naming of a field?")
test.result.set_job_content(
job.content,
self.config,
success=True,
exception=e,
success=False,
duration=time.time() - search_start_time,
)

return

else:
test.result = UnitTestResult()
test.result.set_job_content(
Expand All @@ -554,9 +610,10 @@ def retry_search_until_timeout(
success=False,
duration=time.time() - search_start_time,
)

tick += 1

tick += 1


print("\n\n\n\nhere5\n\n\n\n")
return

def delete_attack_data(self, attack_data_files: list[UnitTestAttackData]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def getETA(self) -> datetime.timedelta:

def getSummaryObject(
self,
test_model_fields: list[str] = ["success", "message"],
test_model_fields: list[str] = ["success", "message", "exception"],
test_job_fields: list[str] = ["resultCount", "runDuration"],
) -> dict:
total_untested = len(self.sync_obj.inputQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def stop(self):
output_file = self.getOutputFilePath()

folder_path.mkdir(parents=True, exist_ok=True)



result_dict = self.getSummaryObject()

# use the yaml writer class
with open(output_file, "w") as res:
res.write(yaml.safe_dump(result_dict))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def all_tests_successful(self) -> bool:
def get_summary(
self,
detection_fields: list[str] = ["name", "search"],
test_model_fields: list[str] = ["success", "message"],
test_model_fields: list[str] = ["success", "message", "exception"],
test_job_fields: list[str] = ["resultCount", "runDuration"],
) -> dict:
summary_dict = {}
Expand Down
2 changes: 0 additions & 2 deletions contentctl/objects/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ def check_name(cls, v, values):

@validator('type')
def check_type(cls, v, values):
#import code
#code.interact(local=locals())
if v not in SES_OBSERVABLE_TYPE_MAPPING.keys():
raise ValueError(f"Invalid type '{v}' provided for observable. Valid observable types are {SES_OBSERVABLE_TYPE_MAPPING.keys()}")
return v
Expand Down
161 changes: 18 additions & 143 deletions contentctl/objects/unit_test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ class UnitTestResult(BaseModel):
missing_observables: list[str] = []
sid_link: Union[None, str] = None
message: Union[None, str] = None
exception: bool = False
exception: Union[Exception,None] = None
success: bool = False
duration: float = 0

class Config:
validate_assignment = True
arbitrary_types_allowed = True

def get_summary_dict(
self,
Expand All @@ -31,8 +32,12 @@ def get_summary_dict(
) -> dict:
results_dict = {}
for field in model_fields:
value = getattr(self, field)
results_dict[field] = getattr(self, field)
if getattr(self, field) is not None:
if isinstance(getattr(self, field), Exception):
#Exception cannot be serialized, so convert to str
results_dict[field] = str(getattr(self, field))
else:
results_dict[field] = getattr(self, field)

for field in job_fields:
if self.job_content is not None:
Expand All @@ -50,20 +55,24 @@ def get_summary_dict(

def set_job_content(
self,
content: Union[Record, None, Exception],
content: Union[Record, None],
config: TestConfig,
exception: Union[Exception, None] = None,
success: bool = False,
duration: float = 0,
):
self.duration = round(duration, 2)
if isinstance(content, Record):
self.exception = exception
self.success = success

if content is not None:
self.job_content = content
self.success = success

if success:
self.message = "TEST PASSED"
else:
self.message = "TEST FAILED"
self.exception = False


if not config.test_instance_address.startswith("http://"):
sid_template = f"http://{SID_TEMPLATE}"
Expand All @@ -75,145 +84,11 @@ def set_job_content(
sid=content.get("sid", None),
)

elif isinstance(content, Exception):
self.job_content = None
self.success = False
self.exception = True
self.message = f"Error during test: {str(content)}"

elif content is None:
self.job_content = None
self.success = False
self.exception = True
self.message = f"Error during test: unable to run test"

else:
msg = f"Error: Unknown type for content in UnitTestResult: {type(content)}"
print(msg)
self.job_content = None
self.success = False
self.exception = True
self.message = f"Error during test - unable to run test {msg}"
return self.success

"""
def get_summary(self, test_name: str, verbose=False) -> str:
lines: list[str] = []
lines.append(f"SEARCH NAME : '{test_name}'")
if verbose or self.determine_success() == False:
lines.append(f"SEARCH : {self.get_search()}")
lines.append(f"SUCCESS : {self.determine_success()}")
if self.exception is True:
lines.append(f"EXCEPTION : {self.exception}")
if self.message is not None:
lines.append(f"MESSAGE : {self.message}")
else:
lines.append(f"SUCCESS : {self.determine_success()}")
if len(self.missing_observables) > 0:
lines.append(f"MISSING OBSERVABLES: {self.missing_observables}")
return "\n\t".join(lines)
def get_search(self) -> str:
if self.job_content is not None:
return self.job_content.get(
"search", "NO SEARCH FOUND - JOB MISSING SEARCH FIELD"
)
return "NO SEARCH FOUND - JOB IS EMPTY"
def add_message(self, message: str):
if self.message is None:
self.message = message
else:
self.message += f"\n{message}"
@root_validator(pre=False)
def update_success(cls, values):
if values["job_content"] is None:
values["exception"] = True
values["success"] = False
if values["message"] is None:
# If the message has not been overridden, then put in a default
values["message"] = "Job Content was None - unknown failure reason"
# Otherwise, a message has been passed so don't overwrite it
return values
if "messages" in values["job_content"]:
fatal_or_error = False
all_messages = values["job_content"]["messages"]
unique_messages = set()
for level, level_messages in all_messages.items():
if level in ["info"]:
# we will skip any info messages
continue
elif level in ["fatal", "error"]:
for msg in level_messages:
# These error indicate a failure - the search was
# not successful. They are important for debugging,
# so we will pass them to the user.
# They also represent a an error during the test
values["logic"] = False
values["success"] = False
values["exception"] = True
unique_messages.add(msg)
fatal_or_error = True
else:
unknown_messages_as_single_string = "\n".join(level_messages)
unique_messages.add(unknown_messages_as_single_string)
if len(unique_messages) == 0:
values["message"] = None # No messages
self.message = f"Error during test: {str(content)}"

else:
# Merge all those messages together
values["message"] = "\n".join(unique_messages)
if fatal_or_error:
return values
# Can there still be a success even if there was an error/fatal message above? Probably not?
if (
"resultCount" in values["job_content"]
and int(values["job_content"]["resultCount"]) == 1
):
# in the future we probably want other metrics, about noise or others, here
values["logic"] = True
values["success"] = True
elif (
"resultCount" in values["job_content"]
and int(values["job_content"]["resultCount"]) != 1
):
values["logic"] = False
values["success"] = False
else:
raise (Exception("Result created with indeterminate success."))
return values
def update_missing_observables(self, missing_observables: set[str]):
self.missing_observables = list(missing_observables)
self.success = self.determine_success()
def determine_success(self) -> bool:
# values_dict = self.update_success(self.__dict__)
# self.exception = values_dict['exception']
# self.success = values_dict['success']
return self.success

def get_job_field(self, fieldName: str):
if self.job_content is None:
# return f"FIELD NAME {fieldName} does not exist in Job Content because Job Content is NONE"
return None
return self.job_content.get(fieldName, None)
def get_time(self) -> timedelta:
if self.job_content is None:
return timedelta(0)
elif "runDuration" in self.job_content:
duration = str(self.job_content["runDuration"])
return timedelta(float(duration))
else:
raise (Exception("runDuration missing from job."))
"""

0 comments on commit 7b3b5da

Please sign in to comment.