From cbcc66d67040e7cd6239ce2a494e2f1bd6680ce7 Mon Sep 17 00:00:00 2001 From: VisualDust Date: Sun, 26 Nov 2023 23:54:06 +0800 Subject: [PATCH] integrations now self-launch --- neetbox/__init__.py | 14 ++-- neetbox/cli/parse.py | 1 - neetbox/config/_config.py | 6 +- neetbox/daemon/client/_update_thread.py | 47 ++---------- neetbox/daemon/server/_server.py | 14 +--- neetbox/integrations/__init__.py | 25 ++++++ neetbox/integrations/environment/__init__.py | 2 + neetbox/integrations/environment/hardware.py | 19 +++-- neetbox/integrations/environment/platform.py | 16 ++-- neetbox/logging/_writer.py | 8 +- neetbox/pipeline/_signal_and_slot.py | 80 ++++++++++++-------- tests/client/neetbox.toml | 11 +-- tests/client/test.py | 28 +------ 13 files changed, 135 insertions(+), 136 deletions(-) create mode 100644 neetbox/integrations/environment/__init__.py diff --git a/neetbox/__init__.py b/neetbox/__init__.py index 3ee8d1e..032427d 100644 --- a/neetbox/__init__.py +++ b/neetbox/__init__.py @@ -3,6 +3,8 @@ import toml +import neetbox.daemon as daemon +import neetbox.integrations as integrations from neetbox.config import default as default_config from neetbox.config._config import update_workspace_config_with from neetbox.logging.formatting import LogStyle @@ -30,6 +32,7 @@ def _init_workspace(path=None, **kwargs) -> bool: _config["name"] = os.path.basename(os.path.normpath(os.getcwd())) toml.dump(_config, config_file) logger.ok(f"Workspace config created as {config_file_path}.") + return True except Exception as e: logger.err(f"Failed to create {config_file_path}: {e}") return False @@ -61,14 +64,13 @@ def _load_workspace(path=None) -> bool: ) -def _load_workspace_and_attach_daemon(): +def _load_workspace_as_a_project(): success = _load_workspace() # init from config file if not success: # failed to load workspace config, exiting os._exit(255) - # try attach daemon - from neetbox.daemon import _try_attach_daemon - - _try_attach_daemon() + # post init + integrations._post_init_workspace() + daemon._try_attach_daemon() def _is_in_workspace(): @@ -79,4 +81,4 @@ def _is_in_workspace(): # running in cli or daemon process, do not load workspace pass elif _is_in_workspace(): # if a config file exist and not running in cli - _load_workspace_and_attach_daemon() + _load_workspace_as_a_project() diff --git a/neetbox/cli/parse.py b/neetbox/cli/parse.py index f12cdbc..bac54fd 100644 --- a/neetbox/cli/parse.py +++ b/neetbox/cli/parse.py @@ -78,7 +78,6 @@ def init(name: str): """initialize current folder as workspace and generate the config file from defaults""" try: if neetbox._init_workspace(name=name): - logger.skip_lines(2) logger.console_banner("neetbox", font="ansishadow") logger.log("Welcome to NEETBOX") except Exception as e: diff --git a/neetbox/config/_config.py b/neetbox/config/_config.py index 38c9021..f1c20f2 100644 --- a/neetbox/config/_config.py +++ b/neetbox/config/_config.py @@ -9,10 +9,10 @@ "version": None, "logging": {"level": "INFO", "logdir": None}, "pipeline": { - "updateInterval": 10, + "updateInterval": 0.5, }, "integrations": { - "environment": {"hardware": {"monit": "true"}, "platform": {"monit": "true"}}, + "environment": {"hardware": {"monit": True, "interval": 0.5}, "platform": {"monit": True}}, }, "daemon": { "enable": True, @@ -22,7 +22,7 @@ "allowIpython": False, "mute": True, "mode": "detached", - "uploadInterval": 10, + "uploadInterval": 0.5, }, } WORKSPACE_CONFIG: dict = DEFAULT_WORKSPACE_CONFIG.copy() diff --git a/neetbox/daemon/client/_update_thread.py b/neetbox/daemon/client/_update_thread.py index da4d548..0a3b60d 100644 --- a/neetbox/daemon/client/_update_thread.py +++ b/neetbox/daemon/client/_update_thread.py @@ -14,27 +14,17 @@ from neetbox.daemon.server._server import CLIENT_API_ROOT from neetbox.logging.formatting import LogStyle from neetbox.logging.logger import Logger -from neetbox.pipeline._signal_and_slot import _UPDATE_VALUE_DICT, SYSTEM_CHANNEL +from neetbox.pipeline._signal_and_slot import _UPDATE_VALUE_DICT, SYSTEM_CHANNEL, watch logger = Logger(style=LogStyle(with_datetime=False, skip_writers=["ws"])) -__TIME_UNIT_SEC = 0.1 -__upload_thread: Union[Thread, None] = None - - -def _upload_thread(daemon_config, base_addr, display_name): - _ctr = 0 +def _add_upload_thread_to_watch(daemon_config, base_addr, display_name): _api_name = "sync" _api_addr = f"{base_addr}{CLIENT_API_ROOT}/{_api_name}/{display_name}" - _disconnect_flag = False - _disconnect_retries = 10 - while True: - _ctr = (_ctr + 1) % 99999999 - _upload_interval = daemon_config["uploadInterval"] - time.sleep(__TIME_UNIT_SEC) - if _ctr % _upload_interval: # not zero - continue + + @watch(interval=daemon_config["uploadInterval"], overwrite=True) + def upload_via_http(): # dump status as json _data = json.dumps(_UPDATE_VALUE_DICT[SYSTEM_CHANNEL], default=str) _headers = {"Content-Type": "application/json"} @@ -44,25 +34,7 @@ def _upload_thread(daemon_config, base_addr, display_name): if resp.is_error: # upload failed raise IOError(f"Failed to upload data to daemon. ({resp.status_code})") except Exception as e: - if _disconnect_flag: # already in retries - _disconnect_retries -= 1 - if not _disconnect_retries: # retry count down exceeded - logger.err( - "Failed to reconnect to daemon after {10} retries, Trying to launch new daemon..." - ) - from neetbox.daemon import _try_attach_daemon - - _try_attach_daemon() - time.sleep(__TIME_UNIT_SEC) - continue logger.warn(f"Failed to upload data to daemon cause {e}. Waiting for reconnect...") - _disconnect_flag = True - else: - if not _disconnect_flag: - continue - logger.ok("Successfully reconnected to daemon.") - _disconnect_flag = False - _disconnect_retries = 10 def connect_daemon(cfg=None, launch_upload_thread=True): @@ -91,11 +63,8 @@ def _check_daemon_alive(_api_addr): return False if launch_upload_thread: - global __upload_thread - if __upload_thread is None or not __upload_thread.is_alive(): - __upload_thread = Thread( - target=_upload_thread, args=[cfg, _base_addr, _display_name], daemon=True - ) - __upload_thread.start() + _add_upload_thread_to_watch( + daemon_config=cfg, base_addr=_base_addr, display_name=_display_name + ) return True diff --git a/neetbox/daemon/server/_server.py b/neetbox/daemon/server/_server.py index db0923f..a5b9597 100644 --- a/neetbox/daemon/server/_server.py +++ b/neetbox/daemon/server/_server.py @@ -32,8 +32,6 @@ from flask import abort, json, request, send_from_directory from websocket_server import WebsocketServer -__DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC = 60 * 60 * 12 # 12 Hours -__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC __PROC_NAME = "NEETBOX SERVER" setproctitle.setproctitle(__PROC_NAME) @@ -306,8 +304,6 @@ def just_send_hello(): @app.route(f"{FRONTEND_API_ROOT}/status/", methods=["GET"]) def return_status_of(name): - global __COUNT_DOWN - __COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC if name in __BRIDGES: _returning_stat = __BRIDGES[name].status # returning specific status else: @@ -316,26 +312,22 @@ def return_status_of(name): @app.route(f"{FRONTEND_API_ROOT}/list", methods=["GET"]) def return_names_of_status(): - global __COUNT_DOWN - __COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC _names = {"names": list(__BRIDGES.keys())} return _names @app.route(f"{CLIENT_API_ROOT}/sync/", methods=["POST"]) def sync_status_of(name): # client side function - global __COUNT_DOWN - __COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC + print("on sync") _json_data = request.get_json() if name not in __BRIDGES: # Client not found __BRIDGES[name] = Bridge(name=name) # Create from sync request __BRIDGES[name].status = _json_data + print("on done") + return "ok" @app.route(f"{FRONTEND_API_ROOT}/shutdown", methods=["POST"]) def shutdown(): - global __COUNT_DOWN - __COUNT_DOWN = -1 - def __sleep_and_shutdown(secs=3): time.sleep(secs) os._exit(0) diff --git a/neetbox/integrations/__init__.py b/neetbox/integrations/__init__.py index 0fa6ab7..0803657 100644 --- a/neetbox/integrations/__init__.py +++ b/neetbox/integrations/__init__.py @@ -4,11 +4,36 @@ # URL: https://gong.host # Date: 20230417 +import importlib +import pkgutil + +from traitlets import Callable + +from neetbox.core import Registry from neetbox.integrations.engine import Engine as engine from neetbox.integrations.engine import get_installed_engines, get_supported_engines +from neetbox.utils.framing import get_frame_module_traceback __all__ = [ "engine", "get_supported_engines", "get_installed_engines", ] + +_QUERY_AFTER_LOAD_WORKSPACE = Registry("__INTEGRATION_LOADER_DICT") + + +def _iter_import_sub_modules(): + _THIS_MODULE = get_frame_module_traceback(1) + for sub_module_info in pkgutil.iter_modules(_THIS_MODULE.__path__): + importlib.import_module(f"{_THIS_MODULE.__name__}.{sub_module_info.name}") + + +def _post_init_workspace(): + _iter_import_sub_modules() + for name, fun in _QUERY_AFTER_LOAD_WORKSPACE.items(): + fun() + + +call_on_workspace_load = _QUERY_AFTER_LOAD_WORKSPACE.register +__all__ = ["call_on_workspace_load"] diff --git a/neetbox/integrations/environment/__init__.py b/neetbox/integrations/environment/__init__.py new file mode 100644 index 0000000..5794975 --- /dev/null +++ b/neetbox/integrations/environment/__init__.py @@ -0,0 +1,2 @@ +import neetbox.integrations.environment.hardware +import neetbox.integrations.environment.platform diff --git a/neetbox/integrations/environment/hardware.py b/neetbox/integrations/environment/hardware.py index 594b705..d67d0d3 100644 --- a/neetbox/integrations/environment/hardware.py +++ b/neetbox/integrations/environment/hardware.py @@ -12,6 +12,8 @@ import psutil from GPUtil import GPU +from neetbox.config import get_module_level_config +from neetbox.integrations import call_on_workspace_load from neetbox.pipeline import watch from neetbox.pipeline._signal_and_slot import SYSTEM_CHANNEL from neetbox.utils import pkg @@ -82,13 +84,13 @@ def __init__(self) -> None: "free": ram_stat[4] / 1e9, } # the environment shoube be imported in the __init__.py of the outer module. And the watcher thread should be auto started - self.set_update_intervel() + # self.set_update_intervel() # do not watch by default def json(self): return {"cpus": self["cpus"], "ram": self["ram"], "gpus": self["gpus"]} def set_update_intervel(self, intervel=1.0) -> None: - if intervel < 1.0: + if intervel < 0: self._do_watch = False return self._do_watch = True @@ -131,6 +133,13 @@ def watcher_fun(env_instance: _Hardware, do_update_gpus: bool): # watch updates in daemon -@watch(name="hardware", _channel=SYSTEM_CHANNEL) -def update_env_stat(): - return hardware.json() +@call_on_workspace_load(name="hardware-monit") +def load_monit_hardware(): + cfg = get_module_level_config() + if cfg["monit"]: # if do monit hardware + hardware.set_update_intervel(cfg["interval"]) + + # watch updates in daemon + @watch(name="hardware", _channel=SYSTEM_CHANNEL, interval=cfg["interval"]) + def update_env_stat(): + return hardware.json() diff --git a/neetbox/integrations/environment/platform.py b/neetbox/integrations/environment/platform.py index 8913454..75a76e4 100644 --- a/neetbox/integrations/environment/platform.py +++ b/neetbox/integrations/environment/platform.py @@ -9,6 +9,8 @@ import platform import subprocess +from neetbox.config import get_module_level_config +from neetbox.integrations import call_on_workspace_load from neetbox.pipeline import watch from neetbox.pipeline._signal_and_slot import SYSTEM_CHANNEL from neetbox.utils.mvc import Singleton @@ -56,9 +58,13 @@ def exec(self, command): # watch updates in daemon -@watch(name="platform", initiative=True, _channel=SYSTEM_CHANNEL) -def update_env_stat(): - return dict(platform) +@call_on_workspace_load(name="show-platform-information") +def load_monit_hardware(): + cfg = get_module_level_config() + if cfg["monit"]: # if do monit hardware + # watch updates in daemon + @watch(name="platform", initiative=True, _channel=SYSTEM_CHANNEL) + def update_env_stat(): + return dict(platform) - -update_env_stat() + update_env_stat() # call once diff --git a/neetbox/logging/_writer.py b/neetbox/logging/_writer.py index a664981..db9c893 100644 --- a/neetbox/logging/_writer.py +++ b/neetbox/logging/_writer.py @@ -48,7 +48,9 @@ def json(self) -> dict: # prefix _prefix = self.prefix or _default_style.prefix # composing datetime - _with_datetime = self.with_datetime or _default_style.with_datetime + _with_datetime = ( + _default_style.with_datetime if self.with_datetime is None else self.with_datetime + ) _datetime = "" if _with_datetime: _datetime_fmt = self.datetime_format or _default_style.datetime_format @@ -56,7 +58,9 @@ def json(self) -> dict: # composing identifier _whom = "" - _with_identifier = self.with_identifier or _default_style.with_identifier + _with_identifier = ( + _default_style.with_identifier if self.with_identifier is None else self.with_identifier + ) if _with_identifier: _caller_identity = self.caller_identity _whom = str(self.whom) # check identity diff --git a/neetbox/pipeline/_signal_and_slot.py b/neetbox/pipeline/_signal_and_slot.py index 757021e..334df13 100644 --- a/neetbox/pipeline/_signal_and_slot.py +++ b/neetbox/pipeline/_signal_and_slot.py @@ -8,6 +8,7 @@ import time from dataclasses import dataclass from datetime import datetime +from decimal import Decimal from functools import partial from threading import Thread from typing import Any, Callable, Optional, Union @@ -17,8 +18,8 @@ from neetbox.core import Registry from neetbox.logging import logger -__TIME_CTR_MAX_CYCLE = 9999999 -__TIME_UNIT_SEC = 0.1 +__TIME_CTR_MAX_CYCLE = Decimal("99999.0") +__TIME_UNIT_SEC = Decimal("0.1") _WATCH_QUERY_DICT = Registry("__pipeline_watch") _LISTEN_QUERY_DICT = collections.defaultdict(lambda: {}) @@ -33,7 +34,7 @@ @dataclass class _WatchConfig(dict): name: str - interval: int + interval: Decimal initiative: bool channel: str = _DEFAULT_CHANNEL # use channel to distinct those values to upload via http @@ -63,26 +64,35 @@ def __update_and_get(name: str, *args, **kwargs): _watch_config = _watched_fun.cfg _channel = _watch_config.channel _the_value = _watched_fun(*args, **kwargs) - _UPDATE_VALUE_DICT[_channel][name] = { - "value": _the_value, - "timestamp": datetime.timestamp(datetime.now()), - "interval": (_watch_config.interval * __TIME_UNIT_SEC), - } - - def __call_listeners(_name: str, _value, _cfg: _WatchConfig): - t0 = time.perf_counter() - for _listener_name, _listener_func in _LISTEN_QUERY_DICT[_name].items(): - _listener_func(_value) - t1 = time.perf_counter() - delta_t = t1 - t0 - _update_interval = _cfg.interval - expected_time_limit = _update_interval * __TIME_UNIT_SEC - if not _cfg.initiative >= 0 and delta_t > expected_time_limit: - logger.warn( - f"Watched value {_name} takes longer time({delta_t:.8f}s) to update than it was expected({expected_time_limit}s)." - ) - Thread(target=__call_listeners, args=(name, _the_value, _watch_config), daemon=True).start() + def _inside_thread(): + _UPDATE_VALUE_DICT[_channel][name] = { + "value": _the_value, + "timestamp": datetime.timestamp(datetime.now()), + "interval": None + if _watch_config.interval is None + else (_watch_config.interval * __TIME_UNIT_SEC), + } + + def __call_listeners(_name: str, _value, _cfg: _WatchConfig): + t0 = time.perf_counter() + for _listener_name, _listener_func in _LISTEN_QUERY_DICT[_name].items(): + _listener_func(_value) + t1 = time.perf_counter() + delta_t = t1 - t0 + if _cfg.initiative: + return + _update_interval = _cfg.interval + expected_time_limit = _update_interval * __TIME_UNIT_SEC + if delta_t > expected_time_limit: + logger.warn( + f"Watched value {_name} takes longer time({delta_t:.8f}s) to update than it was expected ({expected_time_limit}s)." + ) + + Thread(target=__call_listeners, args=(name, _the_value, _watch_config), daemon=True).start() + + Thread(target=_inside_thread, daemon=True).start() + return _the_value @@ -100,7 +110,12 @@ def _watch( name=name, what=_WatchedFun( func=func, - cfg=_WatchConfig(name, interval=interval, initiative=initiative, channel=_channel), + cfg=_WatchConfig( + name, + interval=None if interval is None else Decimal(str(interval)), + initiative=initiative, + channel=_channel, + ), ), overwrite=overwrite, tags=_channel, @@ -111,8 +126,9 @@ def _watch( ) return partial(__update_and_get, name) else: + _update_latency = Decimal(str(interval)) * __TIME_UNIT_SEC logger.debug( - f"added {name} to daemon monitor. It will update every {interval*__TIME_UNIT_SEC} second(s)." + f"added {name} to daemon monitor. It will update every {_update_latency} second(s)." ) return partial(__get, name, _channel) @@ -124,10 +140,8 @@ def watch( overwrite: bool = False, _channel: str = None, ): - if not initiative: # passively update - interval = interval or get_module_level_config()["updateInterval"] - else: - interval = -1 + # set interval to None if passively update + interval = None if initiative else (interval or get_module_level_config()["updateInterval"]) return partial( _watch, name=name, @@ -174,13 +188,15 @@ def listen(target: Union[str, Any], listener_name: Optional[str] = None, overwri def _update_thread(): # update values - _ctr = 0 + _ctr = Decimal("0.0") while True: - _ctr = (_ctr + 1) % __TIME_CTR_MAX_CYCLE - time.sleep(__TIME_UNIT_SEC) + _ctr = (_ctr + __TIME_UNIT_SEC) % __TIME_CTR_MAX_CYCLE + time.sleep(float(__TIME_UNIT_SEC)) for _vname, _watched_fun in _WATCH_QUERY_DICT.items(): _watch_config = _watched_fun.cfg - if not _watch_config.initiative and _ctr % _watch_config.interval == 0: # do update + if not _watch_config.initiative and _ctr % _watch_config.interval == Decimal( + "0.0" + ): # do update _ = __update_and_get(_vname) diff --git a/tests/client/neetbox.toml b/tests/client/neetbox.toml index f726738..d7826f4 100644 --- a/tests/client/neetbox.toml +++ b/tests/client/neetbox.toml @@ -1,10 +1,10 @@ name = "neet_test" [logging] -level = "INFO" +level = "DEBUG" [pipeline] -updateInterval = 10 +updateInterval = 0.6 [daemon] enable = true @@ -13,10 +13,11 @@ port = 5000 allowIpython = false mute = true mode = "detached" -uploadInterval = 10 +uploadInterval = 0.6 [integrations.environment.hardware] -monit = "true" +monit = true +interval = 0.5 [integrations.environment.platform] -monit = "true" +monit = true diff --git a/tests/client/test.py b/tests/client/test.py index ad89b98..128917b 100644 --- a/tests/client/test.py +++ b/tests/client/test.py @@ -4,7 +4,6 @@ from time import sleep from neetbox.daemon import action -from neetbox.integrations.environment import hardware, platform from neetbox.logging import logger from neetbox.pipeline import listen, watch @@ -20,7 +19,7 @@ def print_to_console(metrix): logger.log(f"metrix from train: {metrix}") -@watch("log-some-prefix", interval=50) +@watch("log-some-prefix", interval=5.0) def log_with_some_prefix(): logger.ok("some ok") logger.info("some info") @@ -29,31 +28,6 @@ def log_with_some_prefix(): logger.err("some error") -@watch(interval=40) -def log_with_some_prefix_1(): - logger.ok("some ok") - - -@watch(interval=50) -def log_with_some_prefix_2(): - logger.ok("some ok") - - -@watch(interval=60) -def log_with_some_prefix_200(): - logger.ok("some ok") - - -@watch(interval=88) -def log_with_some_prefix_333(): - logger.ok("some ok") - - -@watch(interval=30) -def log_with_some_prefix_500(): - logger.ok("some ok") - - @action(name="action-1") def action_1(text: str): """take action 1