From 7aba1e8e4a240be4cf38440a4b5b4e5565dd7e04 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 21 Dec 2024 12:16:41 +0100 Subject: [PATCH] Typing todo note --- src/plumpy/coordinator.py | 2 ++ src/plumpy/processes.py | 10 ++++++---- src/plumpy/rmq/process_control.py | 2 ++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/plumpy/coordinator.py b/src/plumpy/coordinator.py index 29533bf4..dc501c4e 100644 --- a/src/plumpy/coordinator.py +++ b/src/plumpy/coordinator.py @@ -45,3 +45,5 @@ def broadcast_send( ) -> Any: ... def task_send(self, task: Any, no_reply: bool = False) -> Any: ... + + def close(self) -> None: ... diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index 26fe2b91..9a7f4781 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -323,6 +323,7 @@ def init(self) -> None: self.add_cleanup(functools.partial(self._coordinator.remove_rpc_subscriber, identifier)) except concurrent.futures.TimeoutError: self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid) + # XXX: handle duplicate subscribing here: see aiida-core test_duplicate_subscriber_identifier. try: # filter out state change broadcasts @@ -1032,6 +1033,7 @@ async def run_callback() -> None: except Exception as exc: import traceback import inspect + # Get traceback as a string tb_str = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__)) @@ -1042,15 +1044,15 @@ async def run_callback() -> None: source_file = inspect.getfile(callback) # getsourcelines returns a tuple (list_of_source_lines, starting_line_number) _, start_line = inspect.getsourcelines(callback) - callback_location = f"{source_file}:{start_line}" + callback_location = f'{source_file}:{start_line}' except Exception: - callback_location = "" + callback_location = '' # Include the callback name, file/line info, and the full traceback in the message raise RuntimeError( f"Error invoking callback '{callback.__name__}' at {callback_location}.\n" - f"Exception: {type(exc).__name__}: {exc}\n\n" - f"Full Traceback:\n{tb_str}" + f'Exception: {type(exc).__name__}: {exc}\n\n' + f'Full Traceback:\n{tb_str}' ) from exc else: while asyncio.isfuture(result): diff --git a/src/plumpy/rmq/process_control.py b/src/plumpy/rmq/process_control.py index 74595dec..bdf95ee7 100644 --- a/src/plumpy/rmq/process_control.py +++ b/src/plumpy/rmq/process_control.py @@ -29,6 +29,7 @@ ProcessStatus = Any +# FIXME: the class not fit typing of ProcessController protocol class RemoteProcessController: """ Control remote processes using coroutines that will send messages and wait @@ -189,6 +190,7 @@ async def execute_process( return result +# FIXME: the class not fit typing of ProcessController protocol class RemoteProcessThreadController: """ A class that can be used to control and launch remote processes