Skip to content

Commit

Permalink
More testing and fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
originalsouth committed Nov 12, 2024
1 parent bd7b82d commit 13400b3
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 17 deletions.
5 changes: 3 additions & 2 deletions octopoes/nibbles/definitions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import importlib
import pkgutil
from collections.abc import Iterable
from logging import getLogger
from pathlib import Path
from types import MethodType, ModuleType
Expand Down Expand Up @@ -27,7 +28,7 @@ def __eq__(self, other):
return False

def __hash__(self):
return hash(str(self.ooi_type) + self.relation_path if self.relation_path else "\0")
return hash(str(self.ooi_type) + self.relation_path if self.relation_path else "\a")


class NibbleDefinition:
Expand All @@ -52,7 +53,7 @@ def __init__(
self.default_enabled = default_enabled
self.config_ooi_relation_path = config_ooi_relation_path

def __call__(self, args):
def __call__(self, args: Iterable[OOI]) -> OOI | Iterable[OOI | None] | None:
if self.payload is None:
raise NotImplementedError
else:
Expand Down
40 changes: 28 additions & 12 deletions octopoes/nibbles/runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Callable
from collections.abc import Callable, Iterable
from datetime import datetime
from itertools import chain, product
from typing import TypeVar
Expand All @@ -14,14 +14,24 @@
U = TypeVar("U")


def otype(ooi: OOI) -> type[OOI]:
def object_type(ooi: OOI) -> type[OOI]:
return type_by_name(ooi.get_ooi_type())


def mergewith(func: Callable[[set[T], set[T]], set[T]], d1: dict[U, set[T]], d2: dict[U, set[T]]) -> dict[U, set[T]]:
def merge_with(func: Callable[[set[T], set[T]], set[T]], d1: dict[U, set[T]], d2: dict[U, set[T]]) -> dict[U, set[T]]:
return {k: func(d1.get(k, set()), d2.get(k, set())) for k in set(d1) | set(d2)}


def flatten(items: Iterable[OOI | Iterable[OOI | None] | None]) -> Iterable[OOI]:
for item in items:
if isinstance(item, OOI):
yield item
elif item is None:
continue
else:
yield from flatten(item)


class NibblesRunner:
def __init__(
self,
Expand All @@ -39,21 +49,25 @@ def _retrieve(self, types: set[type[OOI]], valid_time: datetime) -> None:
cached_types = set(self.objects_by_type_cache)
target_types = set(filter(lambda x: x not in cached_types, types))
objects = self.ooi_repository.list_oois_by_object_types(target_types, valid_time)
objects_by_type = {t: {x for x in objects if isinstance(otype(x), t)} for t in set(map(otype, objects))}
self.objects_by_type_cache = mergewith(set.union, self.objects_by_type_cache, objects_by_type)
objects_by_type = {
t: {x for x in objects if isinstance(object_type(x), t)} for t in set(map(object_type, objects))
}
self.objects_by_type_cache = merge_with(set.union, self.objects_by_type_cache, objects_by_type)

def _run(self, ooi: OOI, valid_time: datetime) -> dict[str, set[OOI]]:
retval: dict[str, set[OOI]] = {}
target_nibbles = list(filter(lambda x: type(ooi) in x.signature, self.nibbles))
self._retrieve(
set(map(lambda x: x.ooi_type, chain.from_iterable(map(lambda x: x.signature, target_nibbles)))), valid_time
set(
map(lambda sgn: sgn.ooi_type, chain.from_iterable(map(lambda nibble: nibble.signature, target_nibbles)))
),
valid_time,
)
for nibble in target_nibbles:
# TODO: filter OOI not abiding the parameters from radix
radix = [self.objects_by_type_cache[sgn.ooi_type] for sgn in nibble.signature]
results = set(
filter(lambda ooi: ooi is not None, chain(map(nibble, filter(lambda x: ooi in x, product(*radix)))))
)
radix = (self.objects_by_type_cache[sgn.ooi_type] for sgn in nibble.signature)
data = (nibble(arg) for arg in product(*radix) if ooi in arg)
results = {obj for obj in flatten(data)}
if results:
retval |= {nibble.id: results}
return retval
Expand All @@ -70,10 +84,12 @@ def infer(self, stack: list[OOI], valid_time: datetime) -> dict[OOI, dict[str, s
retval: dict[OOI, dict[str, set[OOI]]] = {}
blockset = set(stack)
self.objects_by_type_cache = {}
if self._cleared(stack[-1], valid_time):
if stack and self._cleared(stack[-1], valid_time):
while stack:
ooi = stack.pop()
self.objects_by_type_cache = mergewith(set.union, self.objects_by_type_cache, {otype(ooi): {ooi}})
self.objects_by_type_cache = merge_with(
set.union, self.objects_by_type_cache, {object_type(ooi): {ooi}}
)
results = self._run(ooi, valid_time)
if results:
blocks = set(chain.from_iterable(results.values()))
Expand Down
76 changes: 73 additions & 3 deletions octopoes/tests/integration/test_nibbles.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
import os
import sys
from collections.abc import Iterator
from datetime import datetime
from ipaddress import IPv4Address, ip_address
from unittest.mock import Mock

import pytest
from nibbles.definitions import NibbleDefinition, NibbleParameterDefinition
from nibbles.runner import NibblesRunner

from octopoes.core.service import OctopoesService
from octopoes.models import OOI, ScanLevel
from octopoes.models.ooi.network import Network
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, Network
from octopoes.models.ooi.web import URL, HostnameHTTPURL, IPAddressHTTPURL, WebScheme

if os.environ.get("CI") != "1":
pytest.skip("Needs XTDB multinode container.", allow_module_level=True)


NMAX = 13
dummy_nibble = NibbleDefinition(name="dummy", signature=[NibbleParameterDefinition(ooi_type=Network)])


def dummy(network: Network):
def dummy(network: Network) -> Network | None:
global NMAX
if len(network.name) < NMAX:
new_name = network.name + "I"
return Network(name=new_name)


dummy_nibble = NibbleDefinition(name="dummy", signature=[NibbleParameterDefinition(ooi_type=Network)])
dummy_nibble.payload = getattr(sys.modules[__name__], "dummy")


Expand All @@ -45,3 +51,67 @@ def test_dummy_nibble(xtdb_octopoes_service: OctopoesService, event_manager: Moc
ctx = 1 + NMAX - len(network.name)
assert xtdb_octopoes_service.ooi_repository.list_oois({Network}, valid_time).count == ctx
assert xtdb_octopoes_service.ooi_repository.list_oois({OOI}, valid_time).count == 3 * ctx


def url_classification(url: URL) -> Iterator[OOI]:
if url.raw.scheme == "http" or url.raw.scheme == "https":
port = url.raw.port
if port is None:
if url.raw.scheme == "https":
port = 443
elif url.raw.scheme == "http":
port = 80

path = url.raw.path if url.raw.path is not None else "/"

default_args = {"network": url.network, "scheme": WebScheme(url.raw.scheme), "port": port, "path": path}
try:
addr = ip_address(url.raw.host)
except ValueError:
hostname = Hostname(network=url.network, name=url.raw.host)
hostname_url = HostnameHTTPURL(netloc=hostname.reference, **default_args)
original_url = URL(network=url.network, raw=url.raw, web_url=hostname_url.reference)
yield hostname
yield hostname_url
yield original_url
else:
if isinstance(addr, IPv4Address):
ip = IPAddressV4(network=url.network, address=addr)
ip_url = IPAddressHTTPURL(netloc=ip.reference, **default_args)
original_url = URL(network=url.network, raw=url.raw, web_url=ip_url.reference)
yield ip
yield ip_url
yield original_url
else:
ip = IPAddressV6(network=url.network, address=addr)
ip_url = IPAddressHTTPURL(netloc=ip.reference, **default_args)
original_url = URL(network=url.network, raw=url.raw, web_url=ip_url.reference)
yield ip
yield ip_url
yield original_url


url_classification_nibble = NibbleDefinition(
name="url_classification", signature=[NibbleParameterDefinition(ooi_type=URL)], min_scan_level=-1
)
url_classification_nibble.payload = getattr(sys.modules[__name__], "url_classification")


def test_url_classification_nibble(xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime):
nibble_runner = NibblesRunner(
xtdb_octopoes_service.ooi_repository,
xtdb_octopoes_service.scan_profile_repository,
xtdb_octopoes_service.origin_parameter_repository,
)
nibble_runner.nibbles = [url_classification_nibble]
network = Network(name="internet")
xtdb_octopoes_service.ooi_repository.save(network, valid_time)
url = URL(network=network.reference, raw="https://mispo.es/")
xtdb_octopoes_service.ooi_repository.save(url, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

result = nibble_runner.infer([url], valid_time)

assert url in result
assert "url_classification" in result[url]
assert len(result[url]["url_classification"]) == 3

0 comments on commit 13400b3

Please sign in to comment.