diff --git a/contentctl/actions/detection_testing/GitService.py b/contentctl/actions/detection_testing/GitService.py index bfed85a3..fe1f4ca8 100644 --- a/contentctl/actions/detection_testing/GitService.py +++ b/contentctl/actions/detection_testing/GitService.py @@ -67,9 +67,9 @@ def getChanges(self, target_branch:str)->List[Detection]: #Make a filename to content map filepath_to_content_map = { obj.file_path:obj for (_,obj) in self.director.name_to_content_map.items()} - updated_detections:List[Detection] = [] - updated_macros:List[Macro] = [] - updated_lookups:List[Lookup] =[] + updated_detections:set[Detection] = set() + updated_macros:set[Macro] = set() + updated_lookups:set[Lookup] = set() for diff in all_diffs: if type(diff) == pygit2.Patch: @@ -80,14 +80,14 @@ def getChanges(self, target_branch:str)->List[Detection]: if decoded_path.is_relative_to(self.config.path/"detections") and decoded_path.suffix == ".yml": detectionObject = filepath_to_content_map.get(decoded_path, None) if isinstance(detectionObject, Detection): - updated_detections.append(detectionObject) + updated_detections.add(detectionObject) else: raise Exception(f"Error getting detection object for file {str(decoded_path)}") elif decoded_path.is_relative_to(self.config.path/"macros") and decoded_path.suffix == ".yml": macroObject = filepath_to_content_map.get(decoded_path, None) if isinstance(macroObject, Macro): - updated_macros.append(macroObject) + updated_macros.add(macroObject) else: raise Exception(f"Error getting macro object for file {str(decoded_path)}") @@ -98,7 +98,7 @@ def getChanges(self, target_branch:str)->List[Detection]: updatedLookup = filepath_to_content_map.get(decoded_path, None) if not isinstance(updatedLookup,Lookup): raise Exception(f"Expected {decoded_path} to be type {type(Lookup)}, but instead if was {(type(updatedLookup))}") - updated_lookups.append(updatedLookup) + updated_lookups.add(updatedLookup) elif decoded_path.suffix == ".csv": # If the CSV was updated, we want to make sure that we @@ -125,7 +125,7 @@ def getChanges(self, target_branch:str)->List[Detection]: if updatedLookup is not None and updatedLookup not in updated_lookups: # It is possible that both the CSV and YML have been modified for the same lookup, # and we do not want to add it twice. - updated_lookups.append(updatedLookup) + updated_lookups.add(updatedLookup) else: pass @@ -136,7 +136,7 @@ def getChanges(self, target_branch:str)->List[Detection]: # If a detection has at least one dependency on changed content, # then we must test it again - changed_macros_and_lookups = updated_macros + updated_lookups + changed_macros_and_lookups:set[SecurityContentObject] = updated_macros.union(updated_lookups) for detection in self.director.detections: if detection in updated_detections: @@ -146,14 +146,14 @@ def getChanges(self, target_branch:str)->List[Detection]: for obj in changed_macros_and_lookups: if obj in detection.get_content_dependencies(): - updated_detections.append(detection) + updated_detections.add(detection) break #Print out the names of all modified/new content modifiedAndNewContentString = "\n - ".join(sorted([d.name for d in updated_detections])) print(f"[{len(updated_detections)}] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - {modifiedAndNewContentString}") - return updated_detections + return sorted(list(updated_detections)) def getSelected(self, detectionFilenames: List[FilePath]) -> List[Detection]: filepath_to_content_map: dict[FilePath, SecurityContentObject] = { diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 1b716097..1100b72b 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -167,6 +167,7 @@ def adjust_tests_and_groups(self) -> None: the model from the list of unit tests. Also, preemptively skips all manual tests, as well as tests for experimental/deprecated detections and Correlation type detections. """ + # Since ManualTest and UnitTest are not differentiable without looking at the manual_test # tag, Pydantic builds all tests as UnitTest objects. If we see the manual_test flag, we # convert these to ManualTest @@ -789,6 +790,45 @@ def search_observables_exist_validate(self): # Found everything return self + @field_validator("tests", mode="before") + def ensure_yml_test_is_unittest(cls, v:list[dict]): + """The typing for the tests field allows it to be one of + a number of different types of tests. However, ONLY + UnitTest should be allowed to be defined in the YML + file. If part of the UnitTest defined in the YML + is incorrect, such as the attack_data file, then + it will FAIL to be instantiated as a UnitTest and + may instead be instantiated as a different type of + test, such as IntegrationTest (since that requires + less fields) which is incorrect. Ensure that any + raw data read from the YML can actually construct + a valid UnitTest and, if not, return errors right + away instead of letting Pydantic try to construct + it into a different type of test + + Args: + v (list[dict]): list of dicts read from the yml. + Each one SHOULD be a valid UnitTest. If we cannot + construct a valid unitTest from it, a ValueError should be raised + + Returns: + _type_: The input of the function, assuming no + ValueError is raised. + """ + valueErrors:list[ValueError] = [] + for unitTest in v: + #This raises a ValueError on a failed UnitTest. + try: + UnitTest.model_validate(unitTest) + except ValueError as e: + valueErrors.append(e) + if len(valueErrors): + raise ValueError(valueErrors) + # All of these can be constructred as UnitTests with no + # Exceptions, so let the normal flow continue + return v + + @field_validator("tests") def tests_validate( cls, diff --git a/contentctl/objects/macro.py b/contentctl/objects/macro.py index 48daf602..ba5faa8f 100644 --- a/contentctl/objects/macro.py +++ b/contentctl/objects/macro.py @@ -10,7 +10,6 @@ from contentctl.input.director import DirectorOutputDto from contentctl.objects.security_content_object import SecurityContentObject - #The following macros are included in commonly-installed apps. #As such, we will ignore if they are missing from our app. #Included in @@ -55,10 +54,15 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st #If a comment ENDS in a macro, for example ```this is a comment with a macro `macro_here```` #then there is a small edge case where the regex below does not work properly. If that is #the case, we edit the search slightly to insert a space - text_field = re.sub(r"\`\`\`\`", r"` ```", text_field) - text_field = re.sub(r"\`\`\`.*?\`\`\`", " ", text_field) - + if re.findall(r"\`\`\`\`", text_field): + raise ValueError("Search contained four or more '`' characters in a row which is invalid SPL" + "This may have occurred when a macro was commented out.\n" + "Please ammend your search to remove the substring '````'") + # replace all the macros with a space + text_field = re.sub(r"\`\`\`[\s\S]*?\`\`\`", " ", text_field) + + macros_to_get = re.findall(r'`([^\s]+)`', text_field) #If macros take arguments, stop at the first argument. We just want the name of the macro macros_to_get = set([macro[:macro.find('(')] if macro.find('(') != -1 else macro for macro in macros_to_get]) @@ -68,4 +72,3 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st macros_to_get -= macros_to_ignore return Macro.mapNamesToSecurityContentObjects(list(macros_to_get), director) - \ No newline at end of file