Skip to content

Commit

Permalink
Merge pull request #19 from rmouritzen-splunk/patch-observables
Browse files Browse the repository at this point in the history
Add support for defining observables in "patch extends"
  • Loading branch information
rmouritzen-splunk authored Apr 15, 2024
2 parents 85be026 + 2c1d9f4 commit ae540a9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 50 deletions.
10 changes: 5 additions & 5 deletions ocsf_validator/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ class OcsfProfile(TypedDict):
OcsfObject = TypedDict(
"OcsfObject",
{
"caption": str,
"description": str,
"name": str,
"caption": NotRequired[str],
"description": NotRequired[str],
"name": NotRequired[str],
"attributes": Dict[str, OcsfAttr],
"extends": NotRequired[Union[str, list[Optional[str]]]],
"observable": NotRequired[int],
Expand All @@ -143,8 +143,8 @@ class OcsfProfile(TypedDict):
"OcsfEvent",
{
"attributes": Dict[str, OcsfAttr],
"caption": str,
"name": str,
"caption": NotRequired[str],
"name": NotRequired[str],
"uid": NotRequired[int],
"category": NotRequired[str],
"description": NotRequired[str],
Expand Down
67 changes: 22 additions & 45 deletions ocsf_validator/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ def validate(reader: Reader, file: str):
if t not in found:
found[t] = {}

if "name" in reader[file]:
# The patch extends case _always_ has the the same name as its base
if "name" in reader[file] and not _is_patch_extends(reader[file]):
name = reader[file]["name"]
if name not in found[t]:
found[t][name] = []
Expand Down Expand Up @@ -458,13 +459,13 @@ def validate_dictionaries(reader: Reader, file: str) -> None:
if TYPES_KEY in reader[file]:
check_attributes(
reader[file][TYPES_KEY],
lambda a_key, item: f"{item.get('caption')} (Dictionary Type)",
lambda a_key, item: f'"{a_key}" (Dictionary Type)',
file,
)

check_attributes(
reader[file],
lambda a_key, item: f"{item.get('caption')} (Dictionary Attribute)",
lambda a_key, item: f'"{a_key}" (Dictionary Attribute)',
file,
)

Expand Down Expand Up @@ -501,29 +502,10 @@ def validate_classes(reader: Reader, file: str) -> None:
)
collector.handle(IllegalObservableTypeIDError(cause))

if _is_patch_extends(reader[file]):
if any_attribute_has_observable(reader[file]):
cause = (
f"Illegal definition of one or more attributes with"
f' "{OBSERVABLE_KEY}" in patch extends class, file "{file}":'
f" observable definitions in patch extends are not supported."
f" Please file an issue if you find this feature necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

if OBSERVABLES_KEY in reader[file]:
cause = (
f'Illegal "{OBSERVABLES_KEY}" definition in patch extends class,'
f' file "{file}": observable definitions in patch extends are not.'
f" supported. Please file an issue if you find this feature"
f" necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

# Check class-specific attributes
check_attributes(
reader[file],
lambda a_key, item: f"{reader[file].get('caption')} Class: {a_key}"
lambda a_key, item: f"{_item_name(reader[file])} class: {a_key}"
f" (Class-Specific Attribute)",
file,
)
Expand All @@ -533,7 +515,7 @@ def validate_classes(reader: Reader, file: str) -> None:
for attribute_path in reader[file][OBSERVABLES_KEY]:
check_collision(
reader[file][OBSERVABLES_KEY][attribute_path],
f"{reader[file]['caption']} Class: {attribute_path}"
f"{_item_name(reader[file])} class: {attribute_path}"
f" (Class-Specific Attribute Path)",
file,
)
Expand Down Expand Up @@ -581,37 +563,18 @@ def validate_objects(reader: Reader, file: str) -> None:
)
collector.handle(IllegalObservableTypeIDError(cause))

if _is_patch_extends(reader[file]):
if any_attribute_has_observable(reader[file]):
cause = (
f"Illegal definition of one or more attributes with"
f' "{OBSERVABLE_KEY}" in patch extends object, file "{file}":'
f" observable definitions in patch extends are not supported."
f" Please file an issue if you find this feature necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

if OBSERVABLE_KEY in reader[file]:
cause = (
f'Illegal "{OBSERVABLE_KEY}" definition in patch extends object,'
f' file "{file}": observable definitions in patch extends are not.'
f" supported. Please file an issue if you find this feature"
f" necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

# Check top-level observable -- entire object is an observable
if OBSERVABLE_KEY in reader[file]:
check_collision(
reader[file][OBSERVABLE_KEY],
f"{reader[file].get('caption')} (Object)",
f"{_item_name(reader[file])} (Object)",
file,
)

# Check object-specific attributes
check_attributes(
reader[file],
lambda a_key, item: f"{reader[file].get('caption')} Object: {a_key}"
lambda a_key, item: f"{_item_name(reader[file])} object: {a_key}"
f" (Object-Specific Attribute)",
file,
)
Expand All @@ -623,6 +586,20 @@ def validate_objects(reader: Reader, file: str) -> None:
return observables


def _item_name(item):
if _is_patch_extends(item):
suffix = " [patch extends]"
else:
suffix = ""
name = item.get("name")
if name:
return f'"{name}"{suffix}'
extends = item.get("extends")
if extends:
return f'"{extends}"{suffix}'
return f"<unknown>{suffix}"


def _is_patch_extends(item):
"""
Returns True if class or object is a "special" patch extends, which allows
Expand Down

0 comments on commit ae540a9

Please sign in to comment.