Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for defining observables in "patch extends" #19

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ocsf_validator/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ class OcsfProfile(TypedDict):
OcsfObject = TypedDict(
"OcsfObject",
{
"caption": str,
"description": str,
"name": str,
"caption": NotRequired[str],
"description": NotRequired[str],
"name": NotRequired[str],
"attributes": Dict[str, OcsfAttr],
"extends": NotRequired[Union[str, list[Optional[str]]]],
"observable": NotRequired[int],
Expand All @@ -143,8 +143,8 @@ class OcsfProfile(TypedDict):
"OcsfEvent",
{
"attributes": Dict[str, OcsfAttr],
"caption": str,
"name": str,
"caption": NotRequired[str],
"name": NotRequired[str],
"uid": NotRequired[int],
"category": NotRequired[str],
"description": NotRequired[str],
Expand Down
67 changes: 22 additions & 45 deletions ocsf_validator/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ def validate(reader: Reader, file: str):
if t not in found:
found[t] = {}

if "name" in reader[file]:
# The patch extends case _always_ has the the same name as its base
if "name" in reader[file] and not _is_patch_extends(reader[file]):
name = reader[file]["name"]
if name not in found[t]:
found[t][name] = []
Expand Down Expand Up @@ -458,13 +459,13 @@ def validate_dictionaries(reader: Reader, file: str) -> None:
if TYPES_KEY in reader[file]:
check_attributes(
reader[file][TYPES_KEY],
lambda a_key, item: f"{item.get('caption')} (Dictionary Type)",
lambda a_key, item: f'"{a_key}" (Dictionary Type)',
file,
)

check_attributes(
reader[file],
lambda a_key, item: f"{item.get('caption')} (Dictionary Attribute)",
lambda a_key, item: f'"{a_key}" (Dictionary Attribute)',
file,
)

Expand Down Expand Up @@ -501,29 +502,10 @@ def validate_classes(reader: Reader, file: str) -> None:
)
collector.handle(IllegalObservableTypeIDError(cause))

if _is_patch_extends(reader[file]):
if any_attribute_has_observable(reader[file]):
cause = (
f"Illegal definition of one or more attributes with"
f' "{OBSERVABLE_KEY}" in patch extends class, file "{file}":'
f" observable definitions in patch extends are not supported."
f" Please file an issue if you find this feature necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

if OBSERVABLES_KEY in reader[file]:
cause = (
f'Illegal "{OBSERVABLES_KEY}" definition in patch extends class,'
f' file "{file}": observable definitions in patch extends are not.'
f" supported. Please file an issue if you find this feature"
f" necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

# Check class-specific attributes
check_attributes(
reader[file],
lambda a_key, item: f"{reader[file].get('caption')} Class: {a_key}"
lambda a_key, item: f"{_item_name(reader[file])} class: {a_key}"
f" (Class-Specific Attribute)",
file,
)
Expand All @@ -533,7 +515,7 @@ def validate_classes(reader: Reader, file: str) -> None:
for attribute_path in reader[file][OBSERVABLES_KEY]:
check_collision(
reader[file][OBSERVABLES_KEY][attribute_path],
f"{reader[file]['caption']} Class: {attribute_path}"
f"{_item_name(reader[file])} class: {attribute_path}"
f" (Class-Specific Attribute Path)",
file,
)
Expand Down Expand Up @@ -581,37 +563,18 @@ def validate_objects(reader: Reader, file: str) -> None:
)
collector.handle(IllegalObservableTypeIDError(cause))

if _is_patch_extends(reader[file]):
if any_attribute_has_observable(reader[file]):
cause = (
f"Illegal definition of one or more attributes with"
f' "{OBSERVABLE_KEY}" in patch extends object, file "{file}":'
f" observable definitions in patch extends are not supported."
f" Please file an issue if you find this feature necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

if OBSERVABLE_KEY in reader[file]:
cause = (
f'Illegal "{OBSERVABLE_KEY}" definition in patch extends object,'
f' file "{file}": observable definitions in patch extends are not.'
f" supported. Please file an issue if you find this feature"
f" necessary."
)
collector.handle(IllegalObservableTypeIDError(cause))

# Check top-level observable -- entire object is an observable
if OBSERVABLE_KEY in reader[file]:
check_collision(
reader[file][OBSERVABLE_KEY],
f"{reader[file].get('caption')} (Object)",
f"{_item_name(reader[file])} (Object)",
file,
)

# Check object-specific attributes
check_attributes(
reader[file],
lambda a_key, item: f"{reader[file].get('caption')} Object: {a_key}"
lambda a_key, item: f"{_item_name(reader[file])} object: {a_key}"
f" (Object-Specific Attribute)",
file,
)
Expand All @@ -623,6 +586,20 @@ def validate_objects(reader: Reader, file: str) -> None:
return observables


def _item_name(item):
if _is_patch_extends(item):
suffix = " [patch extends]"
else:
suffix = ""
name = item.get("name")
if name:
return f'"{name}"{suffix}'
extends = item.get("extends")
if extends:
return f'"{extends}"{suffix}'
return f"<unknown>{suffix}"


def _is_patch_extends(item):
"""
Returns True if class or object is a "special" patch extends, which allows
Expand Down
Loading