From 43e15206557dc0f9ac2094cc1f7888ed98754878 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Mon, 2 Dec 2024 00:56:52 +0100 Subject: [PATCH] Pause/Play/Status all using message builder --- docs/source/tutorial.ipynb | 2 +- src/plumpy/process_comms.py | 60 +++++++++++++++++++++++++------------ 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/docs/source/tutorial.ipynb b/docs/source/tutorial.ipynb index c1fdb3b2..fe25892d 100644 --- a/docs/source/tutorial.ipynb +++ b/docs/source/tutorial.ipynb @@ -1118,7 +1118,7 @@ "\n", "process = SimpleProcess(communicator=communicator)\n", "\n", - "pprint(communicator.rpc_send(str(process.pid), plumpy.STATUS_MSG).result())" + "pprint(communicator.rpc_send(str(process.pid), plumpy.StatusMessage.build()).result())" ] }, { diff --git a/src/plumpy/process_comms.py b/src/plumpy/process_comms.py index bc2fa125..dee47e3c 100644 --- a/src/plumpy/process_comms.py +++ b/src/plumpy/process_comms.py @@ -12,9 +12,9 @@ from .utils import PID_TYPE __all__ = [ - 'PAUSE_MSG', - 'PLAY_MSG', - 'STATUS_MSG', + 'PauseMessage', + 'PlayMessage', + 'StatusMessage', 'KillMessage', 'ProcessLauncher', 'RemoteProcessController', @@ -45,10 +45,27 @@ class Intent: MessageType = dict[str, Any] -PAUSE_MSG: MessageType = {INTENT_KEY: Intent.PAUSE, MESSAGE_KEY: None} -PLAY_MSG: MessageType = {INTENT_KEY: Intent.PLAY, MESSAGE_KEY: None} -# KILL_MSG: MessageType = {INTENT_KEY: Intent.KILL, MESSAGE_KEY: None, FORCE_KILL_KEY: False} -STATUS_MSG: MessageType = {INTENT_KEY: Intent.STATUS, MESSAGE_KEY: None} +# PAUSE_MSG: MessageType = {INTENT_KEY: Intent.PAUSE, MESSAGE_KEY: None} +# PLAY_MSG: MessageType = {INTENT_KEY: Intent.PLAY, MESSAGE_KEY: None} +# STATUS_MSG: MessageType = {INTENT_KEY: Intent.STATUS, MESSAGE_KEY: None} + + +class PlayMessage: + @classmethod + def build(cls, message: str | None = None) -> MessageType: + return { + INTENT_KEY: Intent.PLAY, + MESSAGE_KEY: message, + } + + +class PauseMessage: + @classmethod + def build(cls, message: str | None = None) -> MessageType: + return { + INTENT_KEY: Intent.PAUSE, + MESSAGE_KEY: message, + } class KillMessage: @@ -61,6 +78,15 @@ def build(cls, message: str | None = None, force: bool = False) -> MessageType: } +class StatusMessage: + @classmethod + def build(cls, message: str | None = None) -> MessageType: + return { + INTENT_KEY: Intent.STATUS, + MESSAGE_KEY: message, + } + + TASK_KEY = 'task' TASK_ARGS = 'args' PERSIST_KEY = 'persist' @@ -176,7 +202,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus': :param pid: the process id :return: the status response from the process """ - future = self._communicator.rpc_send(pid, STATUS_MSG) + future = self._communicator.rpc_send(pid, StatusMessage.build()) result = await asyncio.wrap_future(future) return result @@ -188,11 +214,9 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr :param msg: optional pause message :return: True if paused, False otherwise """ - message = copy.copy(PAUSE_MSG) - if msg is not None: - message[MESSAGE_KEY] = msg + msg = PauseMessage.build(message=msg) - pause_future = self._communicator.rpc_send(pid, message) + pause_future = self._communicator.rpc_send(pid, msg) # rpc_send return a thread future from communicator future = await asyncio.wrap_future(pause_future) # future is just returned from rpc call which return a kiwipy future @@ -206,7 +230,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult': :param pid: the pid of the process to play :return: True if played, False otherwise """ - play_future = self._communicator.rpc_send(pid, PLAY_MSG) + play_future = self._communicator.rpc_send(pid, PlayMessage.build()) future = await asyncio.wrap_future(play_future) result = await asyncio.wrap_future(future) return result @@ -344,7 +368,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future: :param pid: the process id :return: the status response from the process """ - return self._communicator.rpc_send(pid, STATUS_MSG) + return self._communicator.rpc_send(pid, StatusMessage.build()) def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future: """ @@ -355,11 +379,9 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu :return: a response future from the process to be paused """ - message = copy.copy(PAUSE_MSG) - if msg is not None: - message[MESSAGE_KEY] = msg + msg = PauseMessage.build(message=msg) - return self._communicator.rpc_send(pid, message) + return self._communicator.rpc_send(pid, msg) def pause_all(self, msg: Any) -> None: """ @@ -377,7 +399,7 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: :return: a response future from the process to be played """ - return self._communicator.rpc_send(pid, PLAY_MSG) + return self._communicator.rpc_send(pid, PlayMessage.build()) def play_all(self) -> None: """