diff --git a/README.md b/README.md index 185388b..291a272 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,34 @@ [![wakatime](https://wakatime.com/badge/user/b93a26b6-8ea1-44ef-99ed-bcb6e2c732f1/project/8f99904d-dbb1-49e4-814d-8d18bf1e6d1c.svg)](https://wakatime.com/badge/user/b93a26b6-8ea1-44ef-99ed-bcb6e2c732f1/project/8f99904d-dbb1-49e4-814d-8d18bf1e6d1c) -~~一个自产自销的仓库~~ Logging/Debugging/Tracing/Managing/Facilitating your deep learning projects +![](./doc/static/img/readme.png) -A small part of the documentation at [neetbox.550w.host](https://neetbox.550w.host). (We are not ready for the doc yet) +## docs & quick start + +Logging/Debugging/Tracing/Managing/Facilitating your deep learning projects. A small part of the documentation at [neetbox.550w.host](https://neetbox.550w.host). (We are not ready for the doc yet) + +## installation + +```bash +pip install neetbox +``` + +## use neetbox in your project + +in your project folder: +``` +neet init +``` +neetbox cli generates a config file for your project named `neetbox.toml` + +in your code: +```python +import neetbox +``` + +## usage examples + +[how to guides](todo) provides easy examples of basic neetbox funcionalities. ## Star History diff --git a/doc/docs/develop/daemon/index.md b/doc/docs/develop/daemon/index.md new file mode 100644 index 0000000..a26d1a6 --- /dev/null +++ b/doc/docs/develop/daemon/index.md @@ -0,0 +1,140 @@ +# Testing DAEMON + +NEETBOX daemon consists of client side and server side. While client side syncs status of running project and platform information including hardware, server side provides apis for status monitoring and websocket forcasting between client and frontends. + +Basically neetbox will also launch a backend on localhost when a project launching configured with daemon server address at localhost. The server will run in background without any output, and you may want to run a server with output for debug purposes. + +## How to test neetbox server + +at neetbox project root: + +```bash +python neetbox/daemon/server/_server.py +``` + +script above should launch a server in debug mode on `localhost:5000`, it wont read the port in `neetbox.toml`. a swegger UI is provided at [localhost:5000/docs](http://127.0.0.1:5000/docs) in debug mode. websocket server should run on port `5001`. + +If you want to simulate a basic neetbox client sending message to server, at neetbox project root: +```bash +cd tests/client +python test.py +``` +script above should launch a simple case of neetbox project with some logs and status sending to server. + +## Websocket message standard + +websocke messages are described in json. There is a dataclass representing websocket message: + +```python +@dataclass +class WsMsg: + event_type: str + payload: Any + event_id: int = -1 + + def json(self): + return { + EVENT_TYPE_NAME_KEY: self.event_type, + EVENT_ID_NAME_KEY: self.event_id, + PAYLOAD_NAME_KEY: self.payload, + } +``` + +```json +{ + "event-type" : ..., + "payload" : ..., + "event-id" : ... +} +``` + +| key | value type | description | +| :--------: | :--------: | :----------------------------------------------------: | +| event-type | string | indicate type of data in payload | +| payload | string | actual data | +| event-id | int | for events who need ack. default -1 means no event id. | + +## Event types + +the table is increasing. a frequent check would keep you up to date. + +| event-type | accepting direction | means | +| :--------: | :---------------------------: | :----------------------------------------------------------: | +| handshake | cli <--> server <--> frontend | string in `payload` indicate connection type ('cli'/'web') | +| log | cli -> server -> frontend | `payload` contains log data | +| action | cli <- server <- frontend | `payload` contains action trigger | +| ack | cli <--> server <--> frontend | `payload` contains ack, and `event-id` should be a valid key | + +## Examples of websocket data + +### handshake + +for instance, frontend connected to server. frontend should report connection type immediately by sending: + +```json +{ + "event-type": "handshake", + "name": "project name", + "payload": { + "who": "web" + }, + "event-id": X +} +``` + +where `event-id` is used to send ack to the starter of the connection, it should be a random int value. + +### cli sending log to frontend + +cli sents log(s) via websocket, server will receives and broadcast this message to related frontends. cli should send: + +```json +{ + "event-type": "log", + "name": "project name", + "payload": { + "log" : {...json representing log data...} + }, + "event-id": -1 +} +``` + +where `event-id` is a useless segment, leave it default. it's okay if nobody receives log. + +### frontend(s) querys action to cli + +frontend send action request to server, and server will forwards the message to cli. frontend should send: + +```json +{ + "event-type" : "action", + "name": "project name", + "payload" : { + "action" : {...json representing action trigger...} + }, + "event-id" : x +} +``` + +front may want to know the result of action. for example, whether the action was invoked successfully. therefore, `event-id` is necessary for cli to shape a ack response. + +### cli acks frontend action query + +cli execute action query(s) from frontend, and gives response by sending ack: + +```json +{ + "event-type" : "ack", + "name": "project name", + "payload" : { + "action" : {...json representing action result...} + }, + "event-id" : x +} +``` + +where `event-id` is same as received action query. + +--- + +Those are only examples. use them wisely. diff --git a/doc/docs/guide/index.md b/doc/docs/guide/index.md index 7672f70..543e5ec 100644 --- a/doc/docs/guide/index.md +++ b/doc/docs/guide/index.md @@ -10,12 +10,6 @@ sidebar_position: 1 pip install neetbox ``` -If you want to enable torch-related feature, please try below: - -```bash -pip install neetbox[torch] -``` - Since NEETBOX is under ~~heavy~~ development, it's better to forcely reinstall the newest version: ```bash diff --git a/doc/static/img/readme.png b/doc/static/img/readme.png new file mode 100644 index 0000000..1fb99c5 Binary files /dev/null and b/doc/static/img/readme.png differ diff --git a/neetbox/__init__.py b/neetbox/__init__.py index 554e74c..e73a65d 100644 --- a/neetbox/__init__.py +++ b/neetbox/__init__.py @@ -12,18 +12,6 @@ config_file_name = f"{module}.toml" -def post_init(): - import setproctitle - - project_name = get_module_level_config()["name"] - setproctitle.setproctitle(project_name) - - from neetbox.daemon.client._connection import connection - - # post init ws - connection._init_ws() - - def init(path=None, load=False, **kwargs) -> bool: if path: os.chdir(path=path) @@ -77,11 +65,24 @@ def init(path=None, load=False, **kwargs) -> bool: raise e +def post_init(): + import setproctitle + + project_name = get_module_level_config()["name"] + setproctitle.setproctitle(project_name) + + from neetbox.daemon.client._client import connection + + # post init ws + connection._init_ws() + + is_in_daemon_process = ( "NEETBOX_DAEMON_PROCESS" in os.environ and os.environ["NEETBOX_DAEMON_PROCESS"] == "1" ) # print('prevent_daemon_loading =', is_in_daemon_process) if os.path.isfile(config_file_name) and not is_in_daemon_process: # if in a workspace + # todo check if running in cli mode success = init(load=True) # init from config file if not success: os._exit(255) diff --git a/neetbox/daemon/__init__.py b/neetbox/daemon/__init__.py index d67777f..3402deb 100644 --- a/neetbox/daemon/__init__.py +++ b/neetbox/daemon/__init__.py @@ -9,8 +9,8 @@ import time from neetbox.daemon.client._action_agent import _NeetActionManager as NeetActionManager -from neetbox.daemon.client._connection import connection -from neetbox.daemon.client._daemon_client import connect_daemon +from neetbox.daemon.client._client import connection +from neetbox.daemon.client._update_thread import connect_daemon from neetbox.daemon.server.daemonable_process import DaemonableProcess from neetbox.logging import logger from neetbox.pipeline import listen, watch diff --git a/neetbox/daemon/_agent.py b/neetbox/daemon/_agent.py deleted file mode 100644 index 09e6ec2..0000000 --- a/neetbox/daemon/_agent.py +++ /dev/null @@ -1,71 +0,0 @@ -import functools -import inspect -from ast import literal_eval -from threading import Thread -from typing import Callable, Optional - -from neetbox.core import Registry -from neetbox.logging import logger -from neetbox.utils.mvc import Singleton - - -class PackedAction(Callable): - def __init__(self, function: Callable, name=None, **kwargs): - super().__init__(**kwargs) - self.function = function - self.name = name if name else function.__name__ - self.argspec = inspect.getfullargspec(self.function) - - def __call__(self, **argv): - self.function(argv) - - def eval_call(self, params: dict): - eval_params = dict((k, literal_eval(v)) for k, v in params.items()) - return self.function(**eval_params) - - -class _NeetAction(metaclass=Singleton): - __ACTION_POOL: Registry = Registry("__NEET_ACTIONS") - - def register( - self, - *, - name: Optional[str] = None, - ): - return functools.partial(self._register, name=name) - - def _register(self, function: Callable, name: str = None): - packed = PackedAction(function=function, name=name) - _NeetAction.__ACTION_POOL._register(what=packed, name=packed.name, force=True) - return function - - def get_actions(self): - action_names = _NeetAction.__ACTION_POOL.keys() - actions = {} - for n in action_names: - actions[n] = _NeetAction.__ACTION_POOL[n].argspec - return actions - - def eval_call(self, name: str, params: dict): - if name not in _NeetAction.__ACTION_POOL: - logger.err(f"Could not find action with name {name}, action stopped.") - return False - return _NeetAction.__ACTION_POOL[name].eval_call(params) - - -# singleton -neet_action = _NeetAction() - - -# example -if __name__ == "__main__": - - @neet_action.register(name="some") - def some(a, b): - print(a, b) - - print("registered actions:") - print(neet_action.get_actions()) - - print("calling 'some") - neet_action.eval_call("some", {"a": "3", "b": "4"}) diff --git a/neetbox/daemon/_protocol.py b/neetbox/daemon/_protocol.py new file mode 100644 index 0000000..ef1ea93 --- /dev/null +++ b/neetbox/daemon/_protocol.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from typing import Any + +FRONTEND_API_ROOT = "/web" +CLIENT_API_ROOT = "/cli" + + +EVENT_TYPE_NAME_KEY = "event-type" +EVENT_ID_NAME_KEY = "event-id" +NAME_NAME_KEY = "name" +PAYLOAD_NAME_KEY = "payload" + + +@dataclass +class WsMsg: + name: str + event_type: str + payload: Any + event_id: int = -1 + + def json(self): + return { + NAME_NAME_KEY: self.name, + EVENT_TYPE_NAME_KEY: self.event_type, + EVENT_ID_NAME_KEY: self.event_id, + PAYLOAD_NAME_KEY: self.payload, + } diff --git a/neetbox/daemon/client/_action_agent.py b/neetbox/daemon/client/_action_agent.py index 25c21eb..8a257a3 100644 --- a/neetbox/daemon/client/_action_agent.py +++ b/neetbox/daemon/client/_action_agent.py @@ -5,6 +5,8 @@ from typing import Callable, Optional from neetbox.core import Registry +from neetbox.daemon._protocol import * +from neetbox.daemon.client._client import connection from neetbox.logging import logger from neetbox.pipeline import watch from neetbox.utils.mvc import Singleton @@ -41,7 +43,7 @@ def get_action_dict(): action_names = _NeetActionManager.__ACTION_POOL.keys() for name in action_names: action = _NeetActionManager.__ACTION_POOL[name] - action_dict[name] = action.argspec.args + action_dict[name] = {"args": action.argspec.args, "blocking": action.blocking} return action_dict def eval_call(name: str, params: dict, callback: None): @@ -83,6 +85,21 @@ def _register(function: Callable, name: str = None, blocking: bool = False): return function +@connection.ws_subscribe(event_type_name="action") +def __listen_to_actions(msg): + _payload = msg[PAYLOAD_NAME_KEY] + _event_id = msg[EVENT_ID_NAME_KEY] + _action_name = _payload["name"] + _action_args = _payload["args"] + _NeetActionManager.eval_call( + name=_action_name, + params=_action_args, + callback=lambda x: connection.ws_send( + event_type="action", payload={"name": _action_name, "result": x}, _event_id=_event_id + ), + ) + + # example if __name__ == "__main__": import time diff --git a/neetbox/daemon/client/_connection.py b/neetbox/daemon/client/_client.py similarity index 74% rename from neetbox/daemon/client/_connection.py rename to neetbox/daemon/client/_client.py index 42c2221..ca3597d 100644 --- a/neetbox/daemon/client/_connection.py +++ b/neetbox/daemon/client/_client.py @@ -1,17 +1,15 @@ -import asyncio import functools import json import logging -import time -from dataclasses import dataclass +from collections import defaultdict from threading import Thread -from typing import Any, Callable, Optional +from typing import Callable import httpx import websocket from neetbox.config import get_module_level_config -from neetbox.core import Registry +from neetbox.daemon._protocol import * from neetbox.daemon.server._server import CLIENT_API_ROOT from neetbox.logging import logger from neetbox.utils.mvc import Singleton @@ -19,34 +17,31 @@ httpx_logger = logging.getLogger("httpx") httpx_logger.setLevel(logging.ERROR) -EVENT_TYPE_NAME_KEY = "event-type" -EVENT_ID_NAME_KEY = "event-id" -NAME_NAME_KEY = "name" -PAYLOAD_NAME_KEY = "payload" - - -@dataclass -class WsMsg: - name: str - event_type: str - payload: Any - event_id: int = -1 - - def json(self): - return { - NAME_NAME_KEY: self.name, - EVENT_TYPE_NAME_KEY: self.event_type, - EVENT_ID_NAME_KEY: self.event_id, - PAYLOAD_NAME_KEY: self.payload, - } - # singleton class ClientConn(metaclass=Singleton): http: httpx.Client = None __ws_client: websocket.WebSocketApp = None # _websocket_client - __ws_subscription = Registry("__client_ws_subscription") # { event-type-name : list(Callable)} + __ws_subscription = defaultdict(lambda: {}) # default to no subscribers + + def ws_subscribe(event_type_name: str, name: str = None): + """let a function subscribe to ws messages with event type name. + !!! dfor inner APIs only, do not use this in your code! + !!! developers should contorl blocking on their own functions + + Args: + function (Callable): who is subscribing the event type + event_type_name (str, optional): Which event to listen. Defaults to None. + """ + return functools.partial( + ClientConn._ws_subscribe, event_type_name=event_type_name, name=name + ) + + def _ws_subscribe(function: Callable, event_type_name: str, name=None): + name = name or function.__name__ + ClientConn.__ws_subscription[event_type_name][name] = function + logger.debug(f"ws: {name} subscribed to '{event_type_name}") def __init__(self) -> None: def __load_http_client(): @@ -72,7 +67,7 @@ def _init_ws(): # create websocket app logger.log(f"creating websocket connection to {ClientConn.ws_server_addr}") - ws = websocket.WebSocketApp( + ClientConn.wsApp = websocket.WebSocketApp( ClientConn.ws_server_addr, on_open=ClientConn.__on_ws_open, on_message=ClientConn.__on_ws_message, @@ -80,7 +75,7 @@ def _init_ws(): on_close=ClientConn.__on_ws_close, ) - Thread(target=ws.run_forever, kwargs={"reconnect": True}, daemon=True).start() + Thread(target=ClientConn.wsApp.run_forever, kwargs={"reconnect": True}, daemon=True).start() # assign self to websocket log writer from neetbox.logging._writer import _assign_connection_to_WebSocketLogWriter @@ -101,8 +96,12 @@ def __on_ws_open(ws: websocket.WebSocketApp): default=str, ) ) - logger.ok(f"handshake succeed.") - ClientConn.__ws_client = ws + + @ClientConn.ws_subscribe(event_type_name="handshake") + def _handle_handshake(msg): + assert msg[PAYLOAD_NAME_KEY]["result"] == 200 + logger.ok(f"handshake succeed.") + ClientConn.__ws_client = ws def __on_ws_err(ws: websocket.WebSocketApp, msg): logger.err(f"client websocket encountered {msg}") @@ -112,7 +111,7 @@ def __on_ws_close(ws: websocket.WebSocketApp, close_status_code, close_msg): if close_status_code or close_msg: logger.warn(f"ws close status code: {close_status_code}") logger.warn("ws close message: {close_msg}") - ClientConn.__ws_client = None + ClientConn.__ws_client = None def __on_ws_message(ws: websocket.WebSocketApp, msg): """EXAMPLE JSON @@ -122,20 +121,21 @@ def __on_ws_message(ws: websocket.WebSocketApp, msg): "payload": ... } """ + msg = json.loads(msg) # message should be json logger.debug(f"ws received {msg}") - # message should be json + event_type_name = msg[EVENT_TYPE_NAME_KEY] if event_type_name not in ClientConn.__ws_subscription: logger.warn( f"Client received a(n) {event_type_name} event but nobody subscribes it. Ignoring anyway." ) - for subscriber in ClientConn._ws_subscribe[event_type_name]: + for name, subscriber in ClientConn.__ws_subscription[event_type_name].items(): try: subscriber(msg) # pass payload message into subscriber except Exception as e: # subscriber throws error logger.err( - f"Subscriber {subscriber} crashed on message event {event_type_name}, ignoring." + f"Subscriber {name} crashed on message event {event_type_name}, ignoring." ) def ws_send(event_type: str, payload): @@ -154,23 +154,6 @@ def ws_send(event_type: str, payload): else: logger.debug("ws client not exist, message dropped.") - def ws_subscribe(event_type_name: str): - """let a function subscribe to ws messages with event type name. - !!! dfor inner APIs only, do not use this in your code! - !!! developers should contorl blocking on their own functions - - Args: - function (Callable): who is subscribing the event type - event_type_name (str, optional): Which event to listen. Defaults to None. - """ - return functools.partial(ClientConn._ws_subscribe, event_type_name=event_type_name) - - def _ws_subscribe(function: Callable, event_type_name: str): - if event_type_name not in ClientConn.__ws_subscription: - # create subscriber list for event-type name if not exist - ClientConn.__ws_subscription._register([], event_type_name) - ClientConn.__ws_subscription[event_type_name].append(function) - # singleton ClientConn() # __init__ setup http client only diff --git a/neetbox/daemon/client/_client_apis.py b/neetbox/daemon/client/_client_apis.py index efe1fa1..3f9d917 100644 --- a/neetbox/daemon/client/_client_apis.py +++ b/neetbox/daemon/client/_client_apis.py @@ -6,7 +6,7 @@ from neetbox.config import get_module_level_config -from neetbox.daemon.client._connection import connection +from neetbox.daemon.client._client import connection from neetbox.logging import logger from neetbox.utils import pkg from neetbox.utils.framing import get_frame_module_traceback diff --git a/neetbox/daemon/client/_daemon_client.py b/neetbox/daemon/client/_update_thread.py similarity index 98% rename from neetbox/daemon/client/_daemon_client.py rename to neetbox/daemon/client/_update_thread.py index 6ffa701..228c4d7 100644 --- a/neetbox/daemon/client/_daemon_client.py +++ b/neetbox/daemon/client/_update_thread.py @@ -10,7 +10,7 @@ from typing import Union from neetbox.config import get_module_level_config -from neetbox.daemon.client._connection import connection +from neetbox.daemon.client._client import connection from neetbox.daemon.server._server import CLIENT_API_ROOT from neetbox.logging import logger from neetbox.pipeline._signal_and_slot import _update_value_dict diff --git a/neetbox/daemon/readme.md b/neetbox/daemon/readme.md index d977b70..37cd60d 100644 --- a/neetbox/daemon/readme.md +++ b/neetbox/daemon/readme.md @@ -1,13 +1,29 @@ -# DAEMON readme +# Testing DAEMON -## How to run server only +NEETBOX daemon consists of client side and server side. While client side syncs status of running project and platform information including hardware, server side provides apis for status monitoring and websocket forcasting between client and frontends. + +Basically neetbox will also launch a backend on localhost when a project launching configured with daemon server address at localhost. The server will run in background without any output, and you may want to run a server with output for debug purposes. + +## How to test neetbox server at neetbox project root: + ```bash python neetbox/daemon/server/_server.py ``` -## WS message standard +script above should launch a server in debug mode on `localhost:5000`, it wont read the port in `neetbox.toml`. a swegger UI is provided at [localhost:5000/docs](http://127.0.0.1:5000/docs) in debug mode. websocket server should run on port `5001`. + +If you want to simulate a basic neetbox client sending message to server, at neetbox project root: + +```bash +cd tests/client +python test.py +``` + +script above should launch a simple case of neetbox project with some logs and status sending to server. + +## Websocket message standard websocke messages are described in json. There is a dataclass representing websocket message: @@ -96,7 +112,8 @@ frontend send action request to server, and server will forwards the message to "event-type" : "action", "name": "project name", "payload" : { - "action" : {...json representing action trigger...} + "name" : , + "args" : {...arg names and values...} }, "event-id" : x } @@ -110,15 +127,23 @@ cli execute action query(s) from frontend, and gives response by sending ack: ```json { - "event-type" : "ack", + "event-type" : "action", "name": "project name", "payload" : { - "action" : {...json representing action result...} + "name" : , + "result" : }, "event-id" : x } ``` +> CAUTION ! +> +> - frontend should look for list of actions via `/status` api instead of websocket. +> - when **frontend** receive websocket message with `event-type` = `action`, it must be the action result returned from client. +> - when **client** receive websocket message with `event-type` = `action`, it must be the action queried by frontend. +> - only actions with `blocking` = `true` could return result to frontend. + where `event-id` is same as received action query. --- diff --git a/neetbox/daemon/server/_server.py b/neetbox/daemon/server/_server.py index 49a8fed..d73ad77 100644 --- a/neetbox/daemon/server/_server.py +++ b/neetbox/daemon/server/_server.py @@ -11,38 +11,21 @@ from threading import Thread from typing import Any, Dict, Tuple +if __name__ == "__main__": + import ultraimport # if run server solely, sssssssuse relative import, do not trigger neetbox init + + _protocol = ultraimport("__dir__/../_protocol.py") + from _protocol import * +else: + from neetbox.daemon._protocol import * import setproctitle -from flask import Flask, abort, json, request +from flask import abort, json, request from websocket_server import WebsocketServer __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC = 60 * 60 * 12 # 12 Hours __COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC -__DAEMON_NAME = "NEETBOX DAEMON" -setproctitle.setproctitle(__DAEMON_NAME) - -FRONTEND_API_ROOT = "/web" -CLIENT_API_ROOT = "/cli" - -EVENT_TYPE_NAME_KEY = "event-type" -EVENT_ID_NAME_KEY = "event-id" -PAYLOAD_NAME_KEY = "payload" -NAME_NAME_KEY = "name" - - -@dataclass -class WsMsg: - name: str - event_type: str - payload: Any - event_id: int = -1 - - def json(self): - return { - NAME_NAME_KEY: self.name, - EVENT_TYPE_NAME_KEY: self.event_type, - EVENT_ID_NAME_KEY: self.event_id, - PAYLOAD_NAME_KEY: self.payload, - } +__PROC_NAME = "NEETBOX SERVER" +setproctitle.setproctitle(__PROC_NAME) def daemon_process(cfg, debug=False): @@ -69,11 +52,18 @@ def ws_send(self): # =============================================================== if debug: + print("Running with debug, using APIFlask") from apiflask import APIFlask - app = APIFlask(__name__) + app = APIFlask(__PROC_NAME) else: - app = Flask(__name__) + print("Running in production mode, escaping APIFlask") + from flask import Flask + + app = Flask(__PROC_NAME) + + # app = APIFlask(__PROC_NAME) + # websocket server ws_server = WebsocketServer(port=cfg["port"] + 1) __BRIDGES = {} # manage connections @@ -146,7 +136,6 @@ def handle_ws_disconnect(client, server): def handle_ws_message(client, server: WebsocketServer, message): message = json.loads(message) - print(message) # debug # handle event-type _event_type = message[EVENT_TYPE_NAME_KEY] _payload = message[PAYLOAD_NAME_KEY] @@ -155,17 +144,21 @@ def handle_ws_message(client, server: WebsocketServer, message): if _event_type == "handshake": # handle handshake # assign this client to a Bridge _who = _payload["who"] + print(f"handling handshake for {_who} with name {_project_name}") if _who == "web": # new connection from frontend # check if Bridge with name exist if _project_name not in __BRIDGES: # there is no such bridge server.send_message( client=client, - msg=WsMsg( - event_type="ack", - event_id=_event_id, - payload={"result": "404", "reason": "name not found"}, - ).json(), + msg=json.dumps( + WsMsg( + name=_project_name, + event_type="handshake", + event_id=_event_id, + payload={"result": 404, "reason": "name not found"}, + ).json() + ), ) else: # assign web to bridge _target_bridge = __BRIDGES[_project_name] @@ -173,11 +166,14 @@ def handle_ws_message(client, server: WebsocketServer, message): connected_clients[client["id"]] = (_project_name, "web") server.send_message( client=client, - msg=WsMsg( - event_type="ack", - event_id=_event_id, - payload={"result": "200", "reason": "join success"}, - ).json(), + msg=json.dumps( + WsMsg( + name=_project_name, + event_type="handshake", + event_id=_event_id, + payload={"result": 200, "reason": "join success"}, + ).json() + ), ) elif _who == "cli": # new connection from cli @@ -189,12 +185,14 @@ def handle_ws_message(client, server: WebsocketServer, message): connected_clients[client["id"]] = (_project_name, "web") server.send_message( client=client, - msg=WsMsg( - name="_project_name", - event_type="ack", - event_id=_event_id, - payload={"result": "200", "reason": "join success"}, - ).json(), + msg=json.dumps( + WsMsg( + name=_project_name, + event_type="handshake", + event_id=_event_id, + payload={"result": 200, "reason": "join success"}, + ).json() + ), ) elif _event_type == "log": # handle log @@ -292,14 +290,15 @@ def _count_down_thread(): count_down_thread.start() ws_server.run_forever(threaded=True) - app.run(host="0.0.0.0", port=cfg["port"], debug=debug) + + app.run(host="0.0.0.0", port=cfg["port"]) if __name__ == "__main__": cfg = { "enable": True, "host": "localhost", - "port": 20202, + "port": 5000, "displayName": None, "allowIpython": False, "mute": True, diff --git a/neetbox/logging/_writer.py b/neetbox/logging/_writer.py index 0db2958..8f6c69b 100644 --- a/neetbox/logging/_writer.py +++ b/neetbox/logging/_writer.py @@ -157,7 +157,7 @@ def write(self, raw_log: RawLog): class _WebSocketLogWriter(LogWriter): # class level statics - connection = None # connection should be assigned by neetbox.daemon.client._connection to avoid recursive import + connection = None # connection should be assigned by neetbox.daemon.client._client to avoid recursive import def write(self, raw_log: RawLog): json_data = raw_log.json() diff --git a/pyproject.toml b/pyproject.toml index 46c49a1..b3215bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ black = "^23.9.1" isort = "^5.11.5" pre-commit = "^3" apiflask = "^2.0.2" +ultraimport = "^0.0.7" [tool.poetry.extras] torch = ["torch", "torchvision", "torchaudio"] diff --git a/tests/client/readme.md b/tests/client/readme.md new file mode 100644 index 0000000..e04c557 --- /dev/null +++ b/tests/client/readme.md @@ -0,0 +1,3 @@ +# What is this + +this is a single file simulation representing a common case of usage of neetbox. Edit `neetbox.toml` and run `test.py` performs a general test on some neetbox behaviors. diff --git a/tests/client/test.py b/tests/client/test.py new file mode 100644 index 0000000..b121f2b --- /dev/null +++ b/tests/client/test.py @@ -0,0 +1,46 @@ +import os +from random import random +from time import sleep + +from neetbox.daemon import action +from neetbox.integrations.environment import hardware, platform +from neetbox.logging import logger +from neetbox.pipeline import listen, watch + + +@watch("train", initiative=True) +def train(epoch): + loss, acc = random(), random() + return {"loss": loss, "acc": acc} + + +@listen("train") +def print_to_console(metrix): + logger.log(f"metrix from train: {metrix}") + + +@action(name="action-1") +def action_1(text): + logger.log(f"action 1 triggered. text = {text}") + + +@action(name="action-2") +def action_2(text1, text2): + logger.log(f"action 2 triggered. text1 = {text1}, text2 = {text2}") + + +@action(name="wait-for-sec", blocking=True) +def action_2(sec): + sec = int(sec) + logger.log(f"wait for {sec} sec.") + + +@action(name="shutdown", blocking=True) +def sys_exit(): + logger.log("shutdown received, shutting down immediately.") + os._exit(0) + + +for i in range(99999): + sleep(1) + train(i)