Skip to content

Commit

Permalink
a little refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
khyatimahendru committed Feb 10, 2025
1 parent 373925a commit 7fc3a8d
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 41 deletions.
2 changes: 1 addition & 1 deletion misc/pyudmi/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion misc/pyudmi/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ packages = [
{ include = "udmi", from = "src" },
]

[tool.poetry.plugins."udmi.discovery_scans"]
[tool.poetry.plugins."udmi.network_discovery_scans"]
global_bacnet = "udmi.client.manager.discovery.global_bacnet_scan:GlobalBacnetScan"
nmap_banner = "udmi.client.manager.discovery.nmap_banner_scan:NmapBannerScan"
passive_network = "udmi.client.manager.discovery.passive_network_scan:PassiveNetworkScan"
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_manager(key, *args, **kwargs):
ScanNotImplemented: If the specified key is not found in the registered
scans.
"""
entry_point_group = "udmi.discovery_scans" # The entry point group name
entry_point_group = "udmi.network_discovery_scans"
discovery_scans = {}

for entry_point in pkg_resources.iter_entry_points(entry_point_group):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
State
)

from udmi.client.manager.discovery.discovery_manager import DiscoveryManager
from udmi.client.manager.discovery.discovery_manager import (
from udmi.client.manager.discovery.network_discovery_manager import (
NetworkDiscoveryManager,
catch_exceptions_to_status,
mark_task_complete_on_return
)
Expand All @@ -42,7 +42,7 @@ class BacnetObjectAcronyms(StrEnum):
characterstringValue = "CSV"


class GlobalBacnetScan(DiscoveryManager):
class GlobalBacnetScan(NetworkDiscoveryManager):
scan_family = "bacnet"

def __init__(
Expand All @@ -59,7 +59,7 @@ def __init__(
self.bacnet = BAC0.lite(ip=bacnet_ip, port=bacnet_port)
super().__init__(state, publisher)

def start_discovery(self) -> None:
def start_scan(self) -> None:
self.devices_published.clear()
self.cancelled = False
self.runner_thread = threading.Thread(
Expand All @@ -68,7 +68,7 @@ def start_discovery(self) -> None:
self.runner_thread.start()
self.bacnet.discover(global_broadcast=True)

def stop_discovery(self) -> None:
def stop_scan(self) -> None:
self.cancelled = True
self.runner_thread.join()

Expand Down Expand Up @@ -115,8 +115,8 @@ def get_discovery_event(self, device_addr, device_id) -> DiscoveryEvents:
# Capture existence of the device
event = DiscoveryEvents(
generation=self.config.generation,
scan_family=self.scan_family,
scan_addr=str(device_id)
family=self.scan_family,
addr=str(device_id)
)

try:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import atexit
import copy
import dataclasses
import datetime
import functools
import logging
Expand Down Expand Up @@ -72,7 +73,7 @@ def _validate_discovery_config(config: FamilyDiscoveryConfig):
raise RuntimeError("scan duration or interval cannot be negative")


class DiscoveryManager(abc.ABC):
class NetworkDiscoveryManager(abc.ABC):

@property
@abc.abstractmethod
Expand All @@ -83,7 +84,7 @@ def scan_family(self):
"""

@abc.abstractmethod
def start_discovery(self) -> None:
def start_scan(self) -> None:
"""
Trigger the start of discovery.
Expand All @@ -97,7 +98,7 @@ def start_discovery(self) -> None:
"""

@abc.abstractmethod
def stop_discovery(self) -> None:
def stop_scan(self) -> None:
"""
Trigger stopping discovery and clean shutdown of any discovery
processes, threads, etc
Expand Down Expand Up @@ -174,7 +175,7 @@ def _start(self):
self.generation = datetime.datetime.now()
self.state.generation = self.generation
try:
self.start_discovery()
self.start_scan()
except Exception as err:
self._handle_exception(err)
else:
Expand All @@ -197,7 +198,7 @@ def _stop(self):
self.state.status = None
self._set_internal_status(Status.CANCELLING)
try:
self.stop_discovery()
self.stop_scan()
self.state.phase = Phase.stopped
self._set_internal_status(Status.CANCELLED)
logging.info(f"Stopped... {self.__class__.__name__}")
Expand All @@ -212,7 +213,7 @@ def publish(self, event: DiscoveryEvents):
event_number = self._increment_event_counter_and_get()
event.event_no = event_number
logging.warning(
f"published discovery for {event.scan_family}:{event.scan_addr} "
f"published discovery for {event.family}:{event.addr} "
f"#{event_number}")
self.publisher(event)

Expand All @@ -221,8 +222,8 @@ def _reset_udmi_state(self):
Resets the UDMI state for this family by setting all keys as null.
:return:
"""
for field_name in self.state.__fields__:
setattr(self.state, field_name, None)
for field in dataclasses.fields(self.state):
setattr(self.state, field.name, None)

def on_state_update_hook(self):
"""
Expand Down
25 changes: 13 additions & 12 deletions misc/pyudmi/src/udmi/client/manager/discovery/nmap_banner_scan.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import dataclasses
import logging
import os
import subprocess
import threading
from typing import Callable

from udmi.schema import (
FamilyDiscovery,
DiscoveryEvents,
State
)

from udmi.client.manager.discovery.discovery_manager import DiscoveryManager
from udmi.client.manager.discovery.discovery_manager import (
from udmi.client.manager.discovery.network_discovery_manager import (
NetworkDiscoveryManager,
catch_exceptions_to_status,
mark_task_complete_on_return
)
from udmi.util import nmap


class NmapBannerScan(DiscoveryManager):

class NmapBannerScan(NetworkDiscoveryManager):
scan_family = "ether"

def __init__(self,
Expand All @@ -31,14 +32,14 @@ def __init__(self,
self.runner_thread = None
super().__init__(state, publisher)

def start_discovery(self) -> None:
def start_scan(self) -> None:
self.cancel_threads.clear()
self.runner_thread = threading.Thread(
target=self.runner, args=[], daemon=True
)
self.runner_thread.start()

def stop_discovery(self) -> None:
def stop_scan(self) -> None:
logging.info(f"stopping scan {self.__class__.__name__}")
self.cancel_threads.set()
self.runner_thread.join()
Expand Down Expand Up @@ -82,11 +83,11 @@ def runner(self):
for host in nmap.results_reader(nmap_output_file):
event = DiscoveryEvents(
generation=self.generation,
scan_family=self.scan_family,
scan_addr=host.ip,
family=self.scan_family,
addr=host.ip,
families={
"port": {p.port_number: {"banner": p.banner} for p in
host.ports}
},
p.port_number: FamilyDiscovery(addr=p.banner)
for p in host.ports
}
)
self.publish(event.to_json())
self.publish(event)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
State
)

from udmi.client.manager.discovery.discovery_manager import DiscoveryManager
from udmi.client.manager.discovery.network_discovery_manager import (
NetworkDiscoveryManager
)

BACNET_BVLC_MARKER = b"\x81"
BACNET_APDU_I_AM_START = b"\x10\x00\xc4"
Expand Down Expand Up @@ -47,7 +49,7 @@ def get_host(ip: str) -> Optional[str]:
logging.error(f"Error during DNS lookup for {ip}: {e}")


class PassiveNetworkScan(DiscoveryManager):
class PassiveNetworkScan(NetworkDiscoveryManager):
scan_family = "ipv4"

def __init__(
Expand Down Expand Up @@ -87,7 +89,7 @@ def __init__(
f"publish_interval: {self.publish_interval}")
super().__init__(state, publisher)

def start_discovery(self) -> None:
def start_scan(self) -> None:
logging.info("Starting network discovery...")
self.cancel_threads.clear()
self.packets_seen = 0
Expand All @@ -101,7 +103,7 @@ def start_discovery(self) -> None:
self.packet_count_start = self._get_packet_counter_total()
logging.info("Network discovery started.")

def stop_discovery(self) -> None:
def stop_scan(self) -> None:
logging.info("Stopping network discovery...")
self.sniffer.stop()
self.sniffer.join()
Expand Down Expand Up @@ -208,35 +210,36 @@ def queue_worker(self):

def discovery_service(self):
logging.debug("Discovery service thread started.")
while not self.cancel_threads.is_set(): # More robust thread stopping
new_device_records = self.device_records - self.devices_records_published
while not self.cancel_threads.is_set():
new_device_records = (
self.device_records - self.devices_records_published)

if new_device_records:
try:
# Batch publish the records
discovery_events = [
DiscoveryEvents(
generation=self.config.generation,
scan_addr=device_record.addr,
scan_family=self.scan_family,
addr=device_record.addr,
family=self.scan_family,
families=dict(
ether=FamilyDiscovery(addr=device_record.mac)),
)
for device_record in new_device_records
]
for event in discovery_events:
self.publish(event) # Publish all events
self.publish(event)
self.devices_records_published.update(new_device_records)
logging.info(
f"Published {len(new_device_records)} new device records.")
f"Published {len(new_device_records)} new device "
f"records.")

except Exception as e:
logging.error(f"Error publishing discovery events: {e}")

try:
time.sleep(self.publish_interval)
except InterruptedError:
# Handle potential interruption during sleep
pass

logging.debug("Discovery service thread stopped.")

0 comments on commit 7fc3a8d

Please sign in to comment.