Skip to content

Commit

Permalink
now client answers action query
Browse files Browse the repository at this point in the history
  • Loading branch information
visualDust committed Nov 25, 2023
1 parent 1504c0a commit 9fefd96
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 4 deletions.
19 changes: 18 additions & 1 deletion neetbox/daemon/client/_action_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import Callable, Optional

from neetbox.core import Registry
from neetbox.daemon._protocol import *
from neetbox.daemon.client._client import connection
from neetbox.logging import logger
from neetbox.pipeline import watch
from neetbox.utils.mvc import Singleton
Expand Down Expand Up @@ -41,7 +43,7 @@ def get_action_dict():
action_names = _NeetActionManager.__ACTION_POOL.keys()
for name in action_names:
action = _NeetActionManager.__ACTION_POOL[name]
action_dict[name] = action.argspec.args
action_dict[name] = {"args": action.argspec.args, "blocking": action.blocking}
return action_dict

def eval_call(name: str, params: dict, callback: None):
Expand Down Expand Up @@ -83,6 +85,21 @@ def _register(function: Callable, name: str = None, blocking: bool = False):
return function


@connection.ws_subscribe(event_type_name="action")
def __listen_to_actions(msg):
_payload = msg[PAYLOAD_NAME_KEY]
_event_id = msg[EVENT_ID_NAME_KEY]
_action_name = _payload["name"]
_action_args = _payload["args"]
_NeetActionManager.eval_call(
name=_action_name,
params=_action_args,
callback=lambda x: connection.ws_send(
event_type="action", payload={"name": _action_name, "result": x}, _event_id=_event_id
),
)


# example
if __name__ == "__main__":
import time
Expand Down
1 change: 1 addition & 0 deletions neetbox/daemon/client/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def ws_subscribe(event_type_name: str, name: str = None):
def _ws_subscribe(function: Callable, event_type_name: str, name=None):
name = name or function.__name__
ClientConn.__ws_subscription[event_type_name][name] = function
logger.debug(f"ws: {name} subscribed to '{event_type_name}")

def __init__(self) -> None:
def __load_http_client():
Expand Down
17 changes: 14 additions & 3 deletions neetbox/daemon/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ python neetbox/daemon/server/_server.py
script above should launch a server in debug mode on `localhost:5000`, it wont read the port in `neetbox.toml`. a swegger UI is provided at [localhost:5000/docs](http://127.0.0.1:5000/docs) in debug mode. websocket server should run on port `5001`.

If you want to simulate a basic neetbox client sending message to server, at neetbox project root:

```bash
cd tests/client
python test.py
```

script above should launch a simple case of neetbox project with some logs and status sending to server.

## Websocket message standard
Expand Down Expand Up @@ -110,7 +112,8 @@ frontend send action request to server, and server will forwards the message to
"event-type" : "action",
"name": "project name",
"payload" : {
"action" : {...json representing action trigger...}
"name" : <name of action>,
"args" : {...arg names and values...}
},
"event-id" : x
}
Expand All @@ -124,15 +127,23 @@ cli execute action query(s) from frontend, and gives response by sending ack:

```json
{
"event-type" : "ack",
"event-type" : "action",
"name": "project name",
"payload" : {
"action" : {...json representing action result...}
"name" : <name of action>,
"result" : <returned value of cation>
},
"event-id" : x
}
```

> CAUTION !
>
> - frontend should look for list of actions via `/status` api instead of websocket.
> - when **frontend** receive websocket message with `event-type` = `action`, it must be the action result returned from client.
> - when **client** receive websocket message with `event-type` = `action`, it must be the action queried by frontend.
> - only actions with `blocking` = `true` could return result to frontend.
where `event-id` is same as received action query.

---
Expand Down
24 changes: 24 additions & 0 deletions tests/client/test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from random import random
from time import sleep

from neetbox.daemon import action
from neetbox.integrations.environment import hardware, platform
from neetbox.logging import logger
from neetbox.pipeline import listen, watch
Expand All @@ -17,6 +19,28 @@ def print_to_console(metrix):
logger.log(f"metrix from train: {metrix}")


@action(name="action-1")
def action_1(text):
logger.log(f"action 1 triggered. text = {text}")


@action(name="action-2")
def action_2(text1, text2):
logger.log(f"action 2 triggered. text1 = {text1}, text2 = {text2}")


@action(name="wait-for-sec", blocking=True)
def action_2(sec):
sec = int(sec)
logger.log(f"wait for {sec} sec.")


@action(name="shutdown", blocking=True)
def sys_exit():
logger.log("shutdown received, shutting down immediately.")
os._exit(0)


for i in range(99999):
sleep(1)
train(i)

0 comments on commit 9fefd96

Please sign in to comment.