Skip to content

Commit

Permalink
Merge branch 'main' into mathieugonzales_replace_deprecated_pydantic_…
Browse files Browse the repository at this point in the history
…validators
  • Loading branch information
pyth0n1c authored Oct 15, 2024
2 parents d3e063a + 02eb5d7 commit 1550aff
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 27 deletions.
20 changes: 10 additions & 10 deletions contentctl/actions/detection_testing/GitService.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def getChanges(self, target_branch:str)->List[Detection]:

#Make a filename to content map
filepath_to_content_map = { obj.file_path:obj for (_,obj) in self.director.name_to_content_map.items()}
updated_detections:List[Detection] = []
updated_macros:List[Macro] = []
updated_lookups:List[Lookup] =[]
updated_detections:set[Detection] = set()
updated_macros:set[Macro] = set()
updated_lookups:set[Lookup] = set()

for diff in all_diffs:
if type(diff) == pygit2.Patch:
Expand All @@ -80,14 +80,14 @@ def getChanges(self, target_branch:str)->List[Detection]:
if decoded_path.is_relative_to(self.config.path/"detections") and decoded_path.suffix == ".yml":
detectionObject = filepath_to_content_map.get(decoded_path, None)
if isinstance(detectionObject, Detection):
updated_detections.append(detectionObject)
updated_detections.add(detectionObject)
else:
raise Exception(f"Error getting detection object for file {str(decoded_path)}")

elif decoded_path.is_relative_to(self.config.path/"macros") and decoded_path.suffix == ".yml":
macroObject = filepath_to_content_map.get(decoded_path, None)
if isinstance(macroObject, Macro):
updated_macros.append(macroObject)
updated_macros.add(macroObject)
else:
raise Exception(f"Error getting macro object for file {str(decoded_path)}")

Expand All @@ -98,7 +98,7 @@ def getChanges(self, target_branch:str)->List[Detection]:
updatedLookup = filepath_to_content_map.get(decoded_path, None)
if not isinstance(updatedLookup,Lookup):
raise Exception(f"Expected {decoded_path} to be type {type(Lookup)}, but instead if was {(type(updatedLookup))}")
updated_lookups.append(updatedLookup)
updated_lookups.add(updatedLookup)

elif decoded_path.suffix == ".csv":
# If the CSV was updated, we want to make sure that we
Expand All @@ -125,7 +125,7 @@ def getChanges(self, target_branch:str)->List[Detection]:
if updatedLookup is not None and updatedLookup not in updated_lookups:
# It is possible that both the CSV and YML have been modified for the same lookup,
# and we do not want to add it twice.
updated_lookups.append(updatedLookup)
updated_lookups.add(updatedLookup)

else:
pass
Expand All @@ -136,7 +136,7 @@ def getChanges(self, target_branch:str)->List[Detection]:

# If a detection has at least one dependency on changed content,
# then we must test it again
changed_macros_and_lookups = updated_macros + updated_lookups
changed_macros_and_lookups:set[SecurityContentObject] = updated_macros.union(updated_lookups)

for detection in self.director.detections:
if detection in updated_detections:
Expand All @@ -146,14 +146,14 @@ def getChanges(self, target_branch:str)->List[Detection]:

for obj in changed_macros_and_lookups:
if obj in detection.get_content_dependencies():
updated_detections.append(detection)
updated_detections.add(detection)
break

#Print out the names of all modified/new content
modifiedAndNewContentString = "\n - ".join(sorted([d.name for d in updated_detections]))

print(f"[{len(updated_detections)}] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - {modifiedAndNewContentString}")
return updated_detections
return sorted(list(updated_detections))

def getSelected(self, detectionFilenames: List[FilePath]) -> List[Detection]:
filepath_to_content_map: dict[FilePath, SecurityContentObject] = {
Expand Down
10 changes: 6 additions & 4 deletions contentctl/actions/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,11 @@ def check_detection_metadata(self, config: inspect) -> None:
validation_errors[rule_name] = []
# No detections should be removed from build to build
if rule_name not in current_build_conf.detection_stanzas:
validation_errors[rule_name].append(DetectionMissingError(rule_name=rule_name))
if config.suppress_missing_content_exceptions:
print(f"[SUPPRESSED] {DetectionMissingError(rule_name=rule_name).long_message}")
else:
validation_errors[rule_name].append(DetectionMissingError(rule_name=rule_name))
continue

# Pull out the individual stanza for readability
previous_stanza = previous_build_conf.detection_stanzas[rule_name]
current_stanza = current_build_conf.detection_stanzas[rule_name]
Expand Down Expand Up @@ -335,7 +337,7 @@ def check_detection_metadata(self, config: inspect) -> None:
)

# Convert our dict mapping to a flat list of errors for use in reporting
validation_error_list = [x for inner_list in validation_errors.values() for x in inner_list]
validation_error_list = [x for inner_list in validation_errors.values() for x in inner_list]

# Report failure/success
print("\nDetection Metadata Validation:")
Expand All @@ -355,4 +357,4 @@ def check_detection_metadata(self, config: inspect) -> None:
raise ExceptionGroup(
"Validation errors when comparing detection stanzas in current and previous build:",
validation_error_list
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def adjust_tests_and_groups(self) -> None:
the model from the list of unit tests. Also, preemptively skips all manual tests, as well as
tests for experimental/deprecated detections and Correlation type detections.
"""

# Since ManualTest and UnitTest are not differentiable without looking at the manual_test
# tag, Pydantic builds all tests as UnitTest objects. If we see the manual_test flag, we
# convert these to ManualTest
Expand Down Expand Up @@ -789,6 +790,45 @@ def search_observables_exist_validate(self):
# Found everything
return self

@field_validator("tests", mode="before")
def ensure_yml_test_is_unittest(cls, v:list[dict]):
"""The typing for the tests field allows it to be one of
a number of different types of tests. However, ONLY
UnitTest should be allowed to be defined in the YML
file. If part of the UnitTest defined in the YML
is incorrect, such as the attack_data file, then
it will FAIL to be instantiated as a UnitTest and
may instead be instantiated as a different type of
test, such as IntegrationTest (since that requires
less fields) which is incorrect. Ensure that any
raw data read from the YML can actually construct
a valid UnitTest and, if not, return errors right
away instead of letting Pydantic try to construct
it into a different type of test
Args:
v (list[dict]): list of dicts read from the yml.
Each one SHOULD be a valid UnitTest. If we cannot
construct a valid unitTest from it, a ValueError should be raised
Returns:
_type_: The input of the function, assuming no
ValueError is raised.
"""
valueErrors:list[ValueError] = []
for unitTest in v:
#This raises a ValueError on a failed UnitTest.
try:
UnitTest.model_validate(unitTest)
except ValueError as e:
valueErrors.append(e)
if len(valueErrors):
raise ValueError(valueErrors)
# All of these can be constructred as UnitTests with no
# Exceptions, so let the normal flow continue
return v


@field_validator("tests")
def tests_validate(
cls,
Expand Down
18 changes: 11 additions & 7 deletions contentctl/objects/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ def getApp(self, config:test, stage_file=True)->str:
verbose_print=True)
return str(destination)



# TODO (#266): disable the use_enum_values configuration
class Config_Base(BaseModel):
model_config = ConfigDict(use_enum_values=True,validate_default=True, arbitrary_types_allowed=True)
Expand Down Expand Up @@ -288,7 +286,6 @@ def getAPIPath(self)->pathlib.Path:

def getAppTemplatePath(self)->pathlib.Path:
return self.path/"app_template"



class StackType(StrEnum):
Expand All @@ -311,6 +308,16 @@ class inspect(build):
"should be enabled."
)
)
suppress_missing_content_exceptions: bool = Field(
default=False,
description=(
"Suppress exceptions during metadata validation if a detection that existed in "
"the previous build does not exist in this build. This is to ensure that content "
"is not accidentally removed. In order to support testing both public and private "
"content, this warning can be suppressed. If it is suppressed, it will still be "
"printed out as a warning."
)
)
enrichments: bool = Field(
default=True,
description=(
Expand Down Expand Up @@ -952,7 +959,6 @@ def check_environment_variable_for_config(cls, v:List[Infrastructure]):
index+=1



class release_notes(Config_Base):
old_tag:Optional[str] = Field(None, description="Name of the tag to diff against to find new content. "
"If it is not supplied, then it will be inferred as the "
Expand Down Expand Up @@ -1034,6 +1040,4 @@ def ensureNewTagOrLatestBranch(self):
# raise ValueError("The latest_branch '{self.latest_branch}' was not found in the repository")


# return self


# return self
13 changes: 8 additions & 5 deletions contentctl/objects/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.security_content_object import SecurityContentObject


#The following macros are included in commonly-installed apps.
#As such, we will ignore if they are missing from our app.
#Included in
Expand Down Expand Up @@ -55,10 +54,15 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st
#If a comment ENDS in a macro, for example ```this is a comment with a macro `macro_here````
#then there is a small edge case where the regex below does not work properly. If that is
#the case, we edit the search slightly to insert a space
text_field = re.sub(r"\`\`\`\`", r"` ```", text_field)
text_field = re.sub(r"\`\`\`.*?\`\`\`", " ", text_field)

if re.findall(r"\`\`\`\`", text_field):
raise ValueError("Search contained four or more '`' characters in a row which is invalid SPL"
"This may have occurred when a macro was commented out.\n"
"Please ammend your search to remove the substring '````'")

# replace all the macros with a space
text_field = re.sub(r"\`\`\`[\s\S]*?\`\`\`", " ", text_field)


macros_to_get = re.findall(r'`([^\s]+)`', text_field)
#If macros take arguments, stop at the first argument. We just want the name of the macro
macros_to_get = set([macro[:macro.find('(')] if macro.find('(') != -1 else macro for macro in macros_to_get])
Expand All @@ -68,4 +72,3 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st
macros_to_get -= macros_to_ignore
return Macro.mapNamesToSecurityContentObjects(list(macros_to_get), director)


2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pydantic = "^2.8.2"
PyYAML = "^6.0.2"
requests = "~2.32.3"
pycvesearch = "^1.2"
xmltodict = "^0.13.0"
xmltodict = ">=0.13,<0.15"
attackcti = "^0.4.0"
Jinja2 = "^3.1.4"
questionary = "^2.0.1"
Expand Down

0 comments on commit 1550aff

Please sign in to comment.