From 8388c4d875415cf68f6ccdaec5d46ff1f76aecb5 Mon Sep 17 00:00:00 2001 From: PuQing Date: Tue, 21 Nov 2023 22:08:12 +0800 Subject: [PATCH 1/2] fix code style --- .flake8 | 1 + neetbox/__init__.py | 9 +++---- neetbox/integrations/engine.py | 2 +- neetbox/integrations/environment/hardware.py | 28 +++++++++----------- neetbox/integrations/environment/platform.py | 8 ++---- neetbox/integrations/resource.py | 16 ++++------- neetbox/pipeline/_signal_and_slot.py | 28 +++++++------------- 7 files changed, 34 insertions(+), 58 deletions(-) diff --git a/.flake8 b/.flake8 index 7da1f96..ad98539 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,3 @@ [flake8] max-line-length = 100 +extend-ignore = E203 diff --git a/neetbox/__init__.py b/neetbox/__init__.py index 9c9d08f..9dfcf9e 100644 --- a/neetbox/__init__.py +++ b/neetbox/__init__.py @@ -8,7 +8,7 @@ from neetbox.daemon import _try_attach_daemon from neetbox.utils.framing import get_frame_module_traceback -module = get_frame_module_traceback(1).__name__ +module = get_frame_module_traceback(1).__name__ # type: ignore config_file_name = f"{module}.toml" @@ -36,9 +36,7 @@ def init(path=None, load=False, **kwargs) -> bool: if "name" in kwargs and kwargs["name"]: # using given name _config["name"] = kwargs["name"] else: # using the folder name - _config["name"] = os.path.basename( - os.path.normpath(os.getcwd()) - ) + _config["name"] = os.path.basename(os.path.normpath(os.getcwd())) toml.dump(_config, config_file) logger.ok(f"Workspace config created as {config_file_path}.") return True @@ -75,8 +73,7 @@ def init(path=None, load=False, **kwargs) -> bool: is_in_daemon_process = ( - "NEETBOX_DAEMON_PROCESS" in os.environ - and os.environ["NEETBOX_DAEMON_PROCESS"] == "1" + "NEETBOX_DAEMON_PROCESS" in os.environ and os.environ["NEETBOX_DAEMON_PROCESS"] == "1" ) # print('prevent_daemon_loading =', is_in_daemon_process) if os.path.isfile(config_file_name) and not is_in_daemon_process: # if in a workspace diff --git a/neetbox/integrations/engine.py b/neetbox/integrations/engine.py index 0b0b0bc..fd17b90 100644 --- a/neetbox/integrations/engine.py +++ b/neetbox/integrations/engine.py @@ -42,6 +42,6 @@ def get_installed_engines(): importlib.import_module(engine.value) installed_engines.append(engine) logger.info(f"'{engine.vaule}' was found installed.") - except: + except ImportError: pass return installed_engines.copy() diff --git a/neetbox/integrations/environment/hardware.py b/neetbox/integrations/environment/hardware.py index e83c765..c9ce522 100644 --- a/neetbox/integrations/environment/hardware.py +++ b/neetbox/integrations/environment/hardware.py @@ -5,16 +5,6 @@ # Date: 20230413 -from neetbox.utils import pkg -from neetbox.utils.framing import get_frame_module_traceback - -module_name = get_frame_module_traceback().__name__ -assert pkg.is_installed( - "psutil", try_install_if_not=True -), f"{module_name} requires psutil which is not installed" -assert pkg.is_installed( - "GPUtil", try_install_if_not=True -), f"{module_name} requires GPUtil which is not installed" import time from threading import Thread @@ -23,8 +13,18 @@ from GPUtil import GPU from neetbox.pipeline import watch +from neetbox.utils import pkg +from neetbox.utils.framing import get_frame_module_traceback from neetbox.utils.mvc import Singleton +module_name = get_frame_module_traceback().__name__ # type: ignore +assert pkg.is_installed( + "psutil", try_install_if_not=True +), f"{module_name} requires psutil which is not installed" +assert pkg.is_installed( + "GPUtil", try_install_if_not=True +), f"{module_name} requires GPUtil which is not installed" + class _CPU_STAT(dict): def __init__(self, id=-1, percent=0.0, freq=0.0) -> None: @@ -97,15 +97,11 @@ def watcher_fun(env_instance: _Hardware, do_update_gpus: bool): freq=cpu_freq[index], ) if do_update_gpus: - env_instance["gpus"] = [ - _GPU_STAT.parse(_gpu) for _gpu in GPUtil.getGPUs() - ] + env_instance["gpus"] = [_GPU_STAT.parse(_gpu) for _gpu in GPUtil.getGPUs()] env_instance[""] = psutil.cpu_stats() time.sleep(env_instance._update_interval) - self._watcher = Thread( - target=watcher_fun, args=(self, self._with_gpu), daemon=True - ) + self._watcher = Thread(target=watcher_fun, args=(self, self._with_gpu), daemon=True) self._watcher.start() diff --git a/neetbox/integrations/environment/platform.py b/neetbox/integrations/environment/platform.py index 1eda763..f05d980 100644 --- a/neetbox/integrations/environment/platform.py +++ b/neetbox/integrations/environment/platform.py @@ -18,9 +18,7 @@ def __init__(self): # system self["username"] = getpass.getuser() self["machine"] = platform.machine() - self["processor"] = ( - "unknown" if len(platform.processor()) == 0 else platform.processor() - ) + self["processor"] = "unknown" if len(platform.processor()) == 0 else platform.processor() self["os_name"] = platform.system() self["os_release"] = platform.version() self["architecture"] = platform.architecture() @@ -39,9 +37,7 @@ def exec(self, command): str: The command running results. err: The command error information. """ - p = subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True - ) + p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) raw_output, raw_err = p.communicate() rc = p.returncode if self.platform_info["architecture"] == "32bit": diff --git a/neetbox/integrations/resource.py b/neetbox/integrations/resource.py index fd2bc6f..0b82dc1 100644 --- a/neetbox/integrations/resource.py +++ b/neetbox/integrations/resource.py @@ -31,9 +31,7 @@ from neetbox.logging import logger from neetbox.utils import pkg -_loader_pool: Dict[ - str, "ResourceLoader" -] = dict() # all ResourceLoaders are stored here +_loader_pool: Dict[str, "ResourceLoader"] = dict() # all ResourceLoaders are stored here class ResourceLoader: @@ -104,9 +102,7 @@ def perform_scan(): glob_str = "**/*" if self._scan_sub_dirs else "*" if not verbose: # do not output self.file_path_list = [ - str(path) - for path in pathlib.Path(self.path).glob(glob_str) - if can_match(path) + str(path) for path in pathlib.Path(self.path).glob(glob_str) if can_match(path) ] else: self.file_path_list = [] @@ -175,7 +171,7 @@ def get_random_image_as_numpy(self): return np.array(image) def get_random_image_as_tensor(self, engine=engine.Torch): - assert engine in [engine.Torch] # todo support other engines + assert engine in [engine.Torch] # TODO support other engines if engine == engine.Torch: assert pkg.is_installed("torchvision") import torchvision.transforms as T @@ -186,12 +182,10 @@ def get_random_image_as_tensor(self, engine=engine.Torch): T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)), ] ) - image = tensor_transform(self.get_random_image()).unsqueeze( - 0 - ) # To tensor of NCHW + image = tensor_transform(self.get_random_image()).unsqueeze(0) # To tensor of NCHW return image - # todo to_dataset + # TODO(VisualDust): to_dataset def download( diff --git a/neetbox/pipeline/_signal_and_slot.py b/neetbox/pipeline/_signal_and_slot.py index ce277be..945c799 100644 --- a/neetbox/pipeline/_signal_and_slot.py +++ b/neetbox/pipeline/_signal_and_slot.py @@ -9,7 +9,7 @@ from datetime import datetime from functools import partial from threading import Thread -from typing import Any, Callable +from typing import Any, Callable, Optional, Union from neetbox.config import get_module_level_config from neetbox.core import Registry @@ -75,13 +75,11 @@ def _so_update_and_ping_listen(_name, _value, _watch_config): f"Watched value {_name} takes longer time({delta_t:.8f}s) to update than it was expected({expected_time_limit}s)." ) - Thread( - target=_so_update_and_ping_listen, args=(name, _the_value, _watch_config) - ).start() + Thread(target=_so_update_and_ping_listen, args=(name, _the_value, _watch_config)).start() return _the_value -def _watch(func: Callable, name: str, freq: float, initiative=False, force=False): +def _watch(func: Callable, name: Optional[str], freq: float, initiative=False, force=False): """Function decorator to let the daemon watch a value of the function Args: @@ -96,12 +94,8 @@ def _watch(func: Callable, name: str, freq: float, initiative=False, force=False ), force=force, ) - if ( - initiative - ): # initiatively update the value dict when the function was called manually - logger.log( - f"added {name} to daemon monitor. It will update on each call of the function." - ) + if initiative: # initiatively update the value dict when the function was called manually + logger.log(f"added {name} to daemon monitor. It will update on each call of the function.") return partial(__update_and_get, name) else: logger.log( @@ -118,9 +112,9 @@ def watch(name=None, freq=None, initiative=False, force=False): return partial(_watch, name=name, freq=freq, initiative=initiative, force=force) -def _listen(func: Callable, target: str, name: str = None, force=False): +def _listen(func: Callable, target: Union[str, Callable], name: Optional[str] = None, force=False): name = name or func.__name__ - if type(target) is not str: + if not isinstance(target, str): if type(target) is partial: if target.func in [__update_and_get, __get]: target = target.args[0] @@ -142,7 +136,7 @@ def _listen(func: Callable, target: str, name: str = None, force=False): return func -def listen(target, name: str = None, force=False): +def listen(target, name: Optional[str] = None, force=False): return partial(_listen, target=target, name=name, force=force) @@ -154,10 +148,8 @@ def _update_thread(): time.sleep(__TIME_UNIT_SEC) for _vname, _watched_fun in _watch_queue_dict.items(): _watch_config = _watched_fun.others - if ( - not _watch_config["initiative"] and _ctr % _watch_config["freq"] == 0 - ): # do update - _the_value = __update_and_get(_vname) + if not _watch_config["initiative"] and _ctr % _watch_config["freq"] == 0: # do update + _ = __update_and_get(_vname) update_thread = Thread(target=_update_thread, daemon=True) From 89606dd1306e6ebcd73160a523e7d9c1abbce3a7 Mon Sep 17 00:00:00 2001 From: PuQing Date: Thu, 23 Nov 2023 11:23:49 +0800 Subject: [PATCH 2/2] Merge branch 'dev' of https://github.com/visualDust/neetbox --- neetbox/daemon/_agent.py | 70 ++++++++++++++++++++++++++++ neetbox/pipeline/_signal_and_slot.py | 6 +-- 2 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 neetbox/daemon/_agent.py diff --git a/neetbox/daemon/_agent.py b/neetbox/daemon/_agent.py new file mode 100644 index 0000000..0e58bd7 --- /dev/null +++ b/neetbox/daemon/_agent.py @@ -0,0 +1,70 @@ +import functools +import inspect +from ast import literal_eval +from typing import Callable, Optional + +from neetbox.core import Registry +from neetbox.logging import logger +from neetbox.utils.mvc import Singleton + + +class PackedAction(Callable): + def __init__(self, function: Callable, name=None, **kwargs): + super().__init__(**kwargs) + self.function = function + self.name = name if name else function.__name__ + self.argspec = inspect.getfullargspec(self.function) + + def __call__(self, **argv): + self.function(argv) + + def eval_call(self, params: dict): + eval_params = dict((k, literal_eval(v)) for k, v in params.items()) + return self.function(**eval_params) + + +class _NeetAction(metaclass=Singleton): + __ACTION_POOL: Registry = Registry("__NEET_ACTIONS") + + def register( + self, + *, + name: Optional[str] = None, + ): + return functools.partial(self._register, name=name) + + def _register(self, function: Callable, name: str = None): + packed = PackedAction(function=function, name=name) + _NeetAction.__ACTION_POOL._register(what=packed, name=packed.name, force=True) + return function + + def get_actions(self): + action_names = _NeetAction.__ACTION_POOL.keys() + actions = {} + for n in action_names: + actions[n] = _NeetAction.__ACTION_POOL[n].argspec + return actions + + def eval_call(self, name: str, params: dict): + if name not in _NeetAction.__ACTION_POOL: + logger.err(f"Could not find action with name {name}, action stopped.") + return False + return _NeetAction.__ACTION_POOL[name].eval_call(params) + + +# singleton +neet_action = _NeetAction() + + +# example +if __name__ == "__main__": + + @neet_action.register(name="some") + def some(a, b): + print(a, b) + + print("registered actions:") + print(neet_action.get_actions()) + + print("calling 'some") + neet_action.eval_call("some", {"a": "3", "b": "4"}) diff --git a/neetbox/pipeline/_signal_and_slot.py b/neetbox/pipeline/_signal_and_slot.py index 945c799..ab6992e 100644 --- a/neetbox/pipeline/_signal_and_slot.py +++ b/neetbox/pipeline/_signal_and_slot.py @@ -36,9 +36,9 @@ def __init__(self, name, freq, initiative=False): class _WatchedFun: - def __init__(self, func, others) -> None: + def __init__(self, func, watch_cfg) -> None: self.func = func - self.others = others + self.others = watch_cfg def __call__(self, *args: Any, **kwds: Any) -> Any: return self.func(*args, **kwds) @@ -90,7 +90,7 @@ def _watch(func: Callable, name: Optional[str], freq: float, initiative=False, f name=name, what=_WatchedFun( func=func, - others=_WatchConfig(name, freq=freq, initiative=initiative), + watch_cfg=_WatchConfig(name, freq=freq, initiative=initiative), ), force=force, )