Skip to content

Commit

Permalink
integrations now self-launch
Browse files Browse the repository at this point in the history
  • Loading branch information
visualDust committed Nov 26, 2023
1 parent f579b29 commit cbcc66d
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 136 deletions.
14 changes: 8 additions & 6 deletions neetbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import toml

import neetbox.daemon as daemon
import neetbox.integrations as integrations
from neetbox.config import default as default_config
from neetbox.config._config import update_workspace_config_with
from neetbox.logging.formatting import LogStyle
Expand Down Expand Up @@ -30,6 +32,7 @@ def _init_workspace(path=None, **kwargs) -> bool:
_config["name"] = os.path.basename(os.path.normpath(os.getcwd()))
toml.dump(_config, config_file)
logger.ok(f"Workspace config created as {config_file_path}.")
return True
except Exception as e:
logger.err(f"Failed to create {config_file_path}: {e}")
return False
Expand Down Expand Up @@ -61,14 +64,13 @@ def _load_workspace(path=None) -> bool:
)


def _load_workspace_and_attach_daemon():
def _load_workspace_as_a_project():
success = _load_workspace() # init from config file
if not success: # failed to load workspace config, exiting
os._exit(255)
# try attach daemon
from neetbox.daemon import _try_attach_daemon

_try_attach_daemon()
# post init
integrations._post_init_workspace()
daemon._try_attach_daemon()


def _is_in_workspace():
Expand All @@ -79,4 +81,4 @@ def _is_in_workspace():
# running in cli or daemon process, do not load workspace
pass
elif _is_in_workspace(): # if a config file exist and not running in cli
_load_workspace_and_attach_daemon()
_load_workspace_as_a_project()
1 change: 0 additions & 1 deletion neetbox/cli/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def init(name: str):
"""initialize current folder as workspace and generate the config file from defaults"""
try:
if neetbox._init_workspace(name=name):
logger.skip_lines(2)
logger.console_banner("neetbox", font="ansishadow")
logger.log("Welcome to NEETBOX")
except Exception as e:
Expand Down
6 changes: 3 additions & 3 deletions neetbox/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"version": None,
"logging": {"level": "INFO", "logdir": None},
"pipeline": {
"updateInterval": 10,
"updateInterval": 0.5,
},
"integrations": {
"environment": {"hardware": {"monit": "true"}, "platform": {"monit": "true"}},
"environment": {"hardware": {"monit": True, "interval": 0.5}, "platform": {"monit": True}},
},
"daemon": {
"enable": True,
Expand All @@ -22,7 +22,7 @@
"allowIpython": False,
"mute": True,
"mode": "detached",
"uploadInterval": 10,
"uploadInterval": 0.5,
},
}
WORKSPACE_CONFIG: dict = DEFAULT_WORKSPACE_CONFIG.copy()
Expand Down
47 changes: 8 additions & 39 deletions neetbox/daemon/client/_update_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,17 @@
from neetbox.daemon.server._server import CLIENT_API_ROOT
from neetbox.logging.formatting import LogStyle
from neetbox.logging.logger import Logger
from neetbox.pipeline._signal_and_slot import _UPDATE_VALUE_DICT, SYSTEM_CHANNEL
from neetbox.pipeline._signal_and_slot import _UPDATE_VALUE_DICT, SYSTEM_CHANNEL, watch

logger = Logger(style=LogStyle(with_datetime=False, skip_writers=["ws"]))

__TIME_UNIT_SEC = 0.1

__upload_thread: Union[Thread, None] = None


def _upload_thread(daemon_config, base_addr, display_name):
_ctr = 0
def _add_upload_thread_to_watch(daemon_config, base_addr, display_name):
_api_name = "sync"
_api_addr = f"{base_addr}{CLIENT_API_ROOT}/{_api_name}/{display_name}"
_disconnect_flag = False
_disconnect_retries = 10
while True:
_ctr = (_ctr + 1) % 99999999
_upload_interval = daemon_config["uploadInterval"]
time.sleep(__TIME_UNIT_SEC)
if _ctr % _upload_interval: # not zero
continue

@watch(interval=daemon_config["uploadInterval"], overwrite=True)
def upload_via_http():
# dump status as json
_data = json.dumps(_UPDATE_VALUE_DICT[SYSTEM_CHANNEL], default=str)
_headers = {"Content-Type": "application/json"}
Expand All @@ -44,25 +34,7 @@ def _upload_thread(daemon_config, base_addr, display_name):
if resp.is_error: # upload failed
raise IOError(f"Failed to upload data to daemon. ({resp.status_code})")
except Exception as e:
if _disconnect_flag: # already in retries
_disconnect_retries -= 1
if not _disconnect_retries: # retry count down exceeded
logger.err(
"Failed to reconnect to daemon after {10} retries, Trying to launch new daemon..."
)
from neetbox.daemon import _try_attach_daemon

_try_attach_daemon()
time.sleep(__TIME_UNIT_SEC)
continue
logger.warn(f"Failed to upload data to daemon cause {e}. Waiting for reconnect...")
_disconnect_flag = True
else:
if not _disconnect_flag:
continue
logger.ok("Successfully reconnected to daemon.")
_disconnect_flag = False
_disconnect_retries = 10


def connect_daemon(cfg=None, launch_upload_thread=True):
Expand Down Expand Up @@ -91,11 +63,8 @@ def _check_daemon_alive(_api_addr):
return False

if launch_upload_thread:
global __upload_thread
if __upload_thread is None or not __upload_thread.is_alive():
__upload_thread = Thread(
target=_upload_thread, args=[cfg, _base_addr, _display_name], daemon=True
)
__upload_thread.start()
_add_upload_thread_to_watch(
daemon_config=cfg, base_addr=_base_addr, display_name=_display_name
)

return True
14 changes: 3 additions & 11 deletions neetbox/daemon/server/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
from flask import abort, json, request, send_from_directory
from websocket_server import WebsocketServer

__DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC = 60 * 60 * 12 # 12 Hours
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
__PROC_NAME = "NEETBOX SERVER"
setproctitle.setproctitle(__PROC_NAME)

Expand Down Expand Up @@ -306,8 +304,6 @@ def just_send_hello():

@app.route(f"{FRONTEND_API_ROOT}/status/<name>", methods=["GET"])
def return_status_of(name):
global __COUNT_DOWN
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
if name in __BRIDGES:
_returning_stat = __BRIDGES[name].status # returning specific status
else:
Expand All @@ -316,26 +312,22 @@ def return_status_of(name):

@app.route(f"{FRONTEND_API_ROOT}/list", methods=["GET"])
def return_names_of_status():
global __COUNT_DOWN
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
_names = {"names": list(__BRIDGES.keys())}
return _names

@app.route(f"{CLIENT_API_ROOT}/sync/<name>", methods=["POST"])
def sync_status_of(name): # client side function
global __COUNT_DOWN
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
print("on sync")
_json_data = request.get_json()
if name not in __BRIDGES: # Client not found
__BRIDGES[name] = Bridge(name=name) # Create from sync request
__BRIDGES[name].status = _json_data
print("on done")

return "ok"

@app.route(f"{FRONTEND_API_ROOT}/shutdown", methods=["POST"])
def shutdown():
global __COUNT_DOWN
__COUNT_DOWN = -1

def __sleep_and_shutdown(secs=3):
time.sleep(secs)
os._exit(0)
Expand Down
25 changes: 25 additions & 0 deletions neetbox/integrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,36 @@
# URL: https://gong.host
# Date: 20230417

import importlib
import pkgutil

from traitlets import Callable

from neetbox.core import Registry
from neetbox.integrations.engine import Engine as engine
from neetbox.integrations.engine import get_installed_engines, get_supported_engines
from neetbox.utils.framing import get_frame_module_traceback

__all__ = [
"engine",
"get_supported_engines",
"get_installed_engines",
]

_QUERY_AFTER_LOAD_WORKSPACE = Registry("__INTEGRATION_LOADER_DICT")


def _iter_import_sub_modules():
_THIS_MODULE = get_frame_module_traceback(1)
for sub_module_info in pkgutil.iter_modules(_THIS_MODULE.__path__):
importlib.import_module(f"{_THIS_MODULE.__name__}.{sub_module_info.name}")


def _post_init_workspace():
_iter_import_sub_modules()
for name, fun in _QUERY_AFTER_LOAD_WORKSPACE.items():
fun()


call_on_workspace_load = _QUERY_AFTER_LOAD_WORKSPACE.register
__all__ = ["call_on_workspace_load"]
2 changes: 2 additions & 0 deletions neetbox/integrations/environment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import neetbox.integrations.environment.hardware
import neetbox.integrations.environment.platform
19 changes: 14 additions & 5 deletions neetbox/integrations/environment/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import psutil
from GPUtil import GPU

from neetbox.config import get_module_level_config
from neetbox.integrations import call_on_workspace_load
from neetbox.pipeline import watch
from neetbox.pipeline._signal_and_slot import SYSTEM_CHANNEL
from neetbox.utils import pkg
Expand Down Expand Up @@ -82,13 +84,13 @@ def __init__(self) -> None:
"free": ram_stat[4] / 1e9,
}
# the environment shoube be imported in the __init__.py of the outer module. And the watcher thread should be auto started
self.set_update_intervel()
# self.set_update_intervel() # do not watch by default

def json(self):
return {"cpus": self["cpus"], "ram": self["ram"], "gpus": self["gpus"]}

def set_update_intervel(self, intervel=1.0) -> None:
if intervel < 1.0:
if intervel < 0:
self._do_watch = False
return
self._do_watch = True
Expand Down Expand Up @@ -131,6 +133,13 @@ def watcher_fun(env_instance: _Hardware, do_update_gpus: bool):


# watch updates in daemon
@watch(name="hardware", _channel=SYSTEM_CHANNEL)
def update_env_stat():
return hardware.json()
@call_on_workspace_load(name="hardware-monit")
def load_monit_hardware():
cfg = get_module_level_config()
if cfg["monit"]: # if do monit hardware
hardware.set_update_intervel(cfg["interval"])

# watch updates in daemon
@watch(name="hardware", _channel=SYSTEM_CHANNEL, interval=cfg["interval"])
def update_env_stat():
return hardware.json()
16 changes: 11 additions & 5 deletions neetbox/integrations/environment/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import platform
import subprocess

from neetbox.config import get_module_level_config
from neetbox.integrations import call_on_workspace_load
from neetbox.pipeline import watch
from neetbox.pipeline._signal_and_slot import SYSTEM_CHANNEL
from neetbox.utils.mvc import Singleton
Expand Down Expand Up @@ -56,9 +58,13 @@ def exec(self, command):


# watch updates in daemon
@watch(name="platform", initiative=True, _channel=SYSTEM_CHANNEL)
def update_env_stat():
return dict(platform)
@call_on_workspace_load(name="show-platform-information")
def load_monit_hardware():
cfg = get_module_level_config()
if cfg["monit"]: # if do monit hardware
# watch updates in daemon
@watch(name="platform", initiative=True, _channel=SYSTEM_CHANNEL)
def update_env_stat():
return dict(platform)


update_env_stat()
update_env_stat() # call once
8 changes: 6 additions & 2 deletions neetbox/logging/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,19 @@ def json(self) -> dict:
# prefix
_prefix = self.prefix or _default_style.prefix
# composing datetime
_with_datetime = self.with_datetime or _default_style.with_datetime
_with_datetime = (
_default_style.with_datetime if self.with_datetime is None else self.with_datetime
)
_datetime = ""
if _with_datetime:
_datetime_fmt = self.datetime_format or _default_style.datetime_format
_datetime = datetime.now().strftime(_datetime_fmt)

# composing identifier
_whom = ""
_with_identifier = self.with_identifier or _default_style.with_identifier
_with_identifier = (
_default_style.with_identifier if self.with_identifier is None else self.with_identifier
)
if _with_identifier:
_caller_identity = self.caller_identity
_whom = str(self.whom) # check identity
Expand Down
Loading

0 comments on commit cbcc66d

Please sign in to comment.