From 07303946c7f64ded918d9e2a9b0fda1e72a2c56e Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 31 Jan 2025 01:20:57 +0100 Subject: [PATCH] Create/Launch/Continue body into builder (#26) Using the same interface for creating all message type, create/launch/continue, play/pause/kill/status It revert the message API did by using MessageBuilder, it cannot handle `MessageBuilder.continue`, it is not a good API design. --- src/plumpy/__init__.py | 28 ++--- src/plumpy/message.py | 184 +++++++++++++++++------------- src/plumpy/process_states.py | 14 +-- src/plumpy/processes.py | 16 +-- src/plumpy/rmq/process_control.py | 49 ++++---- tests/test_message.py | 8 +- tests/test_process_states.py | 4 +- tests/test_processes.py | 5 +- tests/utils.py | 4 +- 9 files changed, 163 insertions(+), 149 deletions(-) diff --git a/src/plumpy/__init__.py b/src/plumpy/__init__.py index 4cb50820..91d54bf8 100644 --- a/src/plumpy/__init__.py +++ b/src/plumpy/__init__.py @@ -28,7 +28,7 @@ ) from .futures import CancellableAction, Future, capture_exceptions, create_task from .loaders import DefaultObjectLoader, ObjectLoader, get_object_loader, set_object_loader -from .message import MessageBuilder, ProcessLauncher, create_continue_body, create_launch_body +from .message import Message, MsgContinue, MsgCreate, MsgKill, MsgLaunch, MsgPause, MsgPlay, MsgStatus, ProcessLauncher from .persistence import ( Bundle, InMemoryPersister, @@ -64,26 +64,17 @@ from .workchains import ToContext, WorkChain, WorkChainSpec, if_, return_, while_ __all__ = ( - # ports 'UNSPECIFIED', - # utils 'AttributesDict', - # persistence 'Bundle', - # processes 'BundleKeys', - # futures 'CancellableAction', - # exceptions 'ClosedError', - # process_states/States 'Continue', - # coordinator 'Coordinator', 'CoordinatorConnectionError', 'CoordinatorTimeoutError', 'Created', - # loaders 'DefaultObjectLoader', 'Excepted', 'Finished', @@ -92,14 +83,19 @@ 'InputPort', 'Interruption', 'InvalidStateError', - # process_states/Commands 'Kill', 'KillInterruption', 'Killed', 'KilledError', 'LoadSaveContext', - # message - 'MessageBuilder', + 'Message', + 'MsgContinue', + 'MsgCreate', + 'MsgKill', + 'MsgLaunch', + 'MsgPause', + 'MsgPlay', + 'MsgStatus', 'ObjectLoader', 'OutputPort', 'PauseInterruption', @@ -107,16 +103,13 @@ 'PersistenceError', 'Persister', 'PicklePersister', - # event 'PlumpyEventLoopPolicy', 'Port', 'PortNamespace', 'PortValidationError', 'Process', - # controller 'ProcessController', 'ProcessLauncher', - # process_listener 'ProcessListener', 'ProcessSpec', 'ProcessState', @@ -124,7 +117,6 @@ 'Savable', 'SavableFuture', 'Stop', - # workchain 'ToContext', 'TransitionFailed', 'UnsuccessfulResult', @@ -134,8 +126,6 @@ 'WorkChainSpec', 'auto_persist', 'capture_exceptions', - 'create_continue_body', - 'create_launch_body', 'create_task', 'get_event_loop', 'get_object_loader', diff --git a/src/plumpy/message.py b/src/plumpy/message.py index 26cad8c5..95c0d754 100644 --- a/src/plumpy/message.py +++ b/src/plumpy/message.py @@ -48,124 +48,146 @@ class Intent: LOGGER = logging.getLogger(__name__) MessageType = dict[str, Any] +Message = dict[str, Any] -class MessageBuilder: - """MessageBuilder will construct different messages that can passing over coordinator.""" - +class MsgPlay: @classmethod - def play(cls, text: str | None = None) -> MessageType: + def new(cls, text: str | None = None) -> Message: """The play message send over coordinator.""" return { INTENT_KEY: Intent.PLAY, MESSAGE_TEXT_KEY: text, } + +class MsgPause: + """ + The 'pause' message sent over a coordinator. + """ + @classmethod - def pause(cls, text: str | None = None) -> MessageType: - """The pause message send over coordinator.""" + def new(cls, text: str | None = None) -> MessageType: return { INTENT_KEY: Intent.PAUSE, MESSAGE_TEXT_KEY: text, } + +class MsgKill: + """ + The 'kill' message sent over a coordinator. + """ + @classmethod - def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType: - """The kill message send over coordinator.""" + def new(cls, text: str | None = None, force_kill: bool = False) -> MessageType: return { INTENT_KEY: Intent.KILL, MESSAGE_TEXT_KEY: text, FORCE_KILL_KEY: force_kill, } + +class MsgStatus: + """ + The 'status' message sent over a coordinator. + """ + @classmethod - def status(cls, text: str | None = None) -> MessageType: - """The status message send over coordinator.""" + def new(cls, text: str | None = None) -> MessageType: return { INTENT_KEY: Intent.STATUS, MESSAGE_TEXT_KEY: text, } -def create_launch_body( - process_class: str, - init_args: Sequence[Any] | None = None, - init_kwargs: dict[str, Any] | None = None, - persist: bool = False, - loader: loaders.ObjectLoader | None = None, - nowait: bool = True, -) -> dict[str, Any]: - """ - Create a message body for the launch action - - :param process_class: the class of the process to launch - :param init_args: any initialisation positional arguments - :param init_kwargs: any initialisation keyword arguments - :param persist: persist this process if True, otherwise don't - :param loader: the loader to use to load the persisted process - :param nowait: wait for the process to finish before completing the task, otherwise just return the PID - :return: a dictionary with the body of the message to launch the process - :rtype: dict +class MsgLaunch: """ - if loader is None: - loader = loaders.get_object_loader() - - msg_body = { - TASK_KEY: LAUNCH_TASK, - TASK_ARGS: { - PROCESS_CLASS_KEY: loader.identify_object(process_class), - PERSIST_KEY: persist, - NOWAIT_KEY: nowait, - ARGS_KEY: init_args, - KWARGS_KEY: init_kwargs, - }, - } - return msg_body - - -def create_continue_body(pid: 'PID_TYPE', tag: str | None = None, nowait: bool = False) -> dict[str, Any]: + Create the message payload for the launch action. """ - Create a message body to continue an existing process - :param pid: the pid of the existing process - :param tag: the optional persistence tag - :param nowait: wait for the process to finish before completing the task, otherwise just return the PID - :return: a dictionary with the body of the message to continue the process - """ - msg_body = {TASK_KEY: CONTINUE_TASK, TASK_ARGS: {PID_KEY: pid, NOWAIT_KEY: nowait, TAG_KEY: tag}} - return msg_body + @classmethod + def new( + cls, + process_class: str, + init_args: Sequence[Any] | None = None, + init_kwargs: dict[str, Any] | None = None, + persist: bool = False, + loader: 'loaders.ObjectLoader | None' = None, + nowait: bool = True, + ) -> dict[str, Any]: + """ + Create a message body for the launch action + """ + if loader is None: + loader = loaders.get_object_loader() + + return { + TASK_KEY: LAUNCH_TASK, + TASK_ARGS: { + PROCESS_CLASS_KEY: loader.identify_object(process_class), + PERSIST_KEY: persist, + NOWAIT_KEY: nowait, + ARGS_KEY: init_args, + KWARGS_KEY: init_kwargs, + }, + } -def create_create_body( - process_class: str, - init_args: Sequence[Any] | None = None, - init_kwargs: dict[str, Any] | None = None, - persist: bool = False, - loader: loaders.ObjectLoader | None = None, -) -> dict[str, Any]: +class MsgContinue: + """ + Create the message payload to continue an existing process. """ - Create a message body to create a new process - :param process_class: the class of the process to launch - :param init_args: any initialisation positional arguments - :param init_kwargs: any initialisation keyword arguments - :param persist: persist this process if True, otherwise don't - :param loader: the loader to use to load the persisted process - :return: a dictionary with the body of the message to launch the process + @classmethod + def new( + cls, + pid: 'PID_TYPE', + tag: str | None = None, + nowait: bool = False, + ) -> dict[str, Any]: + """ + Create a message body to continue an existing process. + """ + return { + TASK_KEY: CONTINUE_TASK, + TASK_ARGS: { + PID_KEY: pid, + NOWAIT_KEY: nowait, + TAG_KEY: tag, + }, + } + + +class MsgCreate: + """ + Create the message payload to create a new process. """ - if loader is None: - loader = loaders.get_object_loader() - - msg_body = { - TASK_KEY: CREATE_TASK, - TASK_ARGS: { - PROCESS_CLASS_KEY: loader.identify_object(process_class), - PERSIST_KEY: persist, - ARGS_KEY: init_args, - KWARGS_KEY: init_kwargs, - }, - } - return msg_body + + @classmethod + def new( + cls, + process_class: str, + init_args: Sequence[Any] | None = None, + init_kwargs: dict[str, Any] | None = None, + persist: bool = False, + loader: 'loaders.ObjectLoader | None' = None, + ) -> dict[str, Any]: + """ + Create a message body to create a new process. + """ + if loader is None: + loader = loaders.get_object_loader() + + return { + TASK_KEY: CREATE_TASK, + TASK_ARGS: { + PROCESS_CLASS_KEY: loader.identify_object(process_class), + PERSIST_KEY: persist, + ARGS_KEY: init_args, + KWARGS_KEY: init_kwargs, + }, + } class ProcessLauncher: diff --git a/src/plumpy/process_states.py b/src/plumpy/process_states.py index 4f625272..0603a2ef 100644 --- a/src/plumpy/process_states.py +++ b/src/plumpy/process_states.py @@ -17,7 +17,7 @@ from typing_extensions import Self, override from plumpy.loaders import ObjectLoader -from plumpy.message import MessageBuilder, MessageType +from plumpy.message import Message, MsgKill, MsgPause from plumpy.persistence import ensure_object_loader try: @@ -64,17 +64,17 @@ class Interruption(Exception): # noqa: N818 class KillInterruption(Interruption): def __init__(self, msg_text: str | None): super().__init__() - msg = MessageBuilder.kill(text=msg_text) + msg = MsgKill.new(text=msg_text) - self.msg: MessageType = msg + self.msg: Message = msg class PauseInterruption(Interruption): def __init__(self, msg_text: str | None): super().__init__() - msg = MessageBuilder.pause(text=msg_text) + msg = MsgPause.new(text=msg_text) - self.msg: MessageType = msg + self.msg: Message = msg # region Commands @@ -104,7 +104,7 @@ def save(self, loader: ObjectLoader | None = None) -> SAVED_STATE_TYPE: @auto_persist('msg') class Kill(Command): - def __init__(self, msg: MessageType | None = None): + def __init__(self, msg: Message | None = None): super().__init__() self.msg = msg @@ -633,7 +633,7 @@ class Killed: is_terminal: ClassVar[bool] = True - def __init__(self, msg: MessageType | None): + def __init__(self, msg: Message | None): """ :param msg: Optional kill message """ diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index de96131a..ec215ecc 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -53,7 +53,7 @@ from .base.utils import call_with_super_check, super_check from .event_helper import EventHelper from .futures import CancellableAction, capture_exceptions -from .message import MESSAGE_TEXT_KEY, MessageBuilder, MessageType +from .message import MESSAGE_TEXT_KEY, Message, MsgKill, MsgPause from .process_listener import ProcessListener from .process_spec import ProcessSpec from .utils import PID_TYPE, SAVED_STATE_TYPE, protected @@ -564,7 +564,7 @@ def killed(self) -> bool: """Return whether the process is killed.""" return self.state_label == process_states.ProcessState.KILLED - def killed_msg(self) -> MessageType | None: + def killed_msg(self) -> Message | None: """Return the killed message.""" if isinstance(self.state, process_states.Killed): return self.state.msg @@ -899,7 +899,7 @@ def on_excepted(self) -> None: self._fire_event(ProcessListener.on_process_excepted, str(self.future().exception())) @super_check - def on_kill(self, msg: MessageType | None) -> None: + def on_kill(self, msg: Message | None) -> None: """Entering the KILLED state.""" if msg is None: msg_txt = '' @@ -945,7 +945,7 @@ def _fire_event(self, evt: Callable[..., Any], *args: Any, **kwargs: Any) -> Non # region Communication - def message_receive(self, _comm: Coordinator, msg: MessageType) -> Any: + def message_receive(self, _comm: Coordinator, msg: Message) -> Any: """ Coroutine called when the process receives a message from the communicator @@ -977,7 +977,7 @@ def message_receive(self, _comm: Coordinator, msg: MessageType) -> Any: raise RuntimeError('Unknown intent') def broadcast_receive( - self, _comm: Coordinator, msg: MessageType, sender: Any, subject: Any, correlation_id: Any + self, _comm: Coordinator, msg: Message, sender: Any, subject: Any, correlation_id: Any ) -> concurrent.futures.Future | None: """ Coroutine called when the process receives a message from the communicator @@ -1141,14 +1141,14 @@ def pause(self, msg_text: str | None = None) -> bool | CancellableAction: self._state.interrupt(interrupt_exception) return cast(CancellableAction, self._interrupt_action) - msg = MessageBuilder.pause(msg_text) + msg = MsgPause.new(msg_text) return self._do_pause(state_msg=msg) @staticmethod def _interrupt(state: Interruptable, reason: Exception) -> None: state.interrupt(reason) - def _do_pause(self, state_msg: MessageType | None, next_state: state_machine.State | None = None) -> bool: + def _do_pause(self, state_msg: Message | None, next_state: state_machine.State | None = None) -> bool: """Carry out the pause procedure, optionally transitioning to the next state first""" try: if next_state is not None: @@ -1267,7 +1267,7 @@ def kill(self, msg_text: str | None = None) -> bool | asyncio.Future: self._state.interrupt(interrupt_exception) return cast(CancellableAction, self._interrupt_action) - msg = MessageBuilder.kill(msg_text) + msg = MsgKill.new(msg_text) new_state = create_state(self, process_states.ProcessState.KILLED, msg=msg) self.transition_to(new_state) return True diff --git a/src/plumpy/rmq/process_control.py b/src/plumpy/rmq/process_control.py index c90daec4..4c644502 100644 --- a/src/plumpy/rmq/process_control.py +++ b/src/plumpy/rmq/process_control.py @@ -14,11 +14,14 @@ from plumpy.coordinator import Coordinator from plumpy.message import ( Intent, - MessageBuilder, - MessageType, - create_continue_body, - create_create_body, - create_launch_body, + Message, + MsgContinue, + MsgCreate, + MsgKill, + MsgLaunch, + MsgPause, + MsgPlay, + MsgStatus, ) from plumpy.utils import PID_TYPE @@ -87,7 +90,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus': :param pid: the process id :return: the status response from the process """ - future = self._coordinator.rpc_send(pid, MessageBuilder.status()) + future = self._coordinator.rpc_send(pid, MsgStatus.new()) result = await asyncio.wrap_future(future) return result @@ -99,7 +102,7 @@ async def pause_process(self, pid: 'PID_TYPE', msg_text: Optional[str] = None) - :param msg: optional pause message :return: True if paused, False otherwise """ - msg = MessageBuilder.pause(text=msg_text) + msg = MsgPause.new(text=msg_text) pause_future = self._coordinator.rpc_send(pid, msg) # rpc_send return a thread future from coordinator @@ -115,7 +118,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult': :param pid: the pid of the process to play :return: True if played, False otherwise """ - play_future = self._coordinator.rpc_send(pid, MessageBuilder.play()) + play_future = self._coordinator.rpc_send(pid, MsgPlay.new()) future = await asyncio.wrap_future(play_future) result = await asyncio.wrap_future(future) return result @@ -128,7 +131,7 @@ async def kill_process(self, pid: 'PID_TYPE', msg_text: Optional[str] = None) -> :param msg: optional kill message :return: True if killed, False otherwise """ - msg = MessageBuilder.kill(text=msg_text) + msg = MsgKill.new(text=msg_text) # Wait for the communication to go through kill_future = self._coordinator.rpc_send(pid, msg) @@ -147,7 +150,7 @@ async def continue_process( :param pid: the pid of the process to continue :param tag: the checkpoint tag to continue from """ - message = create_continue_body(pid=pid, tag=tag, nowait=nowait) + message = MsgContinue.new(pid=pid, tag=tag, nowait=nowait) # Wait for the communication to go through continue_future = self._coordinator.task_send(message, no_reply=no_reply) future = await asyncio.wrap_future(continue_future) @@ -182,7 +185,7 @@ async def launch_process( :return: the result of launching the process """ - message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) + message = MsgLaunch.new(process_class, init_args, init_kwargs, persist, loader, nowait) launch_future = self._coordinator.task_send(message, no_reply=no_reply) future = await asyncio.wrap_future(launch_future) @@ -215,13 +218,13 @@ async def execute_process( :return: the result of executing the process """ - message = create_create_body(process_class, init_args, init_kwargs, persist=True, loader=loader) + message = MsgCreate.new(process_class, init_args, init_kwargs, persist=True, loader=loader) create_future = self._coordinator.task_send(message) future = await asyncio.wrap_future(create_future) pid: 'PID_TYPE' = await asyncio.wrap_future(future) - message = create_continue_body(pid, nowait=nowait) + message = MsgContinue.new(pid, nowait=nowait) continue_future = self._coordinator.task_send(message, no_reply=no_reply) future = await asyncio.wrap_future(continue_future) @@ -253,7 +256,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future: :param pid: the process id :return: the status response from the process """ - return self._coordinator.rpc_send(pid, MessageBuilder.status()) + return self._coordinator.rpc_send(pid, MsgStatus.new()) def pause_process(self, pid: 'PID_TYPE', msg_text: Optional[str] = None) -> kiwipy.Future: """ @@ -264,7 +267,7 @@ def pause_process(self, pid: 'PID_TYPE', msg_text: Optional[str] = None) -> kiwi :return: a response future from the process to be paused """ - msg = MessageBuilder.pause(text=msg_text) + msg = MsgPause.new(text=msg_text) return self._coordinator.rpc_send(pid, msg) @@ -274,7 +277,7 @@ def pause_all(self, msg_text: Optional[str]) -> None: :param msg: an optional pause message """ - msg = MessageBuilder.pause(text=msg_text) + msg = MsgPause.new(text=msg_text) self._coordinator.broadcast_send(msg, subject=Intent.PAUSE) def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: @@ -285,7 +288,7 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: :return: a response future from the process to be played """ - return self._coordinator.rpc_send(pid, MessageBuilder.play()) + return self._coordinator.rpc_send(pid, MsgPlay.new()) def play_all(self) -> None: """ @@ -301,7 +304,7 @@ def kill_process(self, pid: 'PID_TYPE', msg_text: Optional[str] = None) -> kiwip :param msg: optional kill message :return: a response future from the process to be killed """ - msg = MessageBuilder.kill(text=msg_text) + msg = MsgKill.new(text=msg_text) return self._coordinator.rpc_send(pid, msg) def kill_all(self, msg_text: Optional[str]) -> None: @@ -310,11 +313,11 @@ def kill_all(self, msg_text: Optional[str]) -> None: :param msg: an optional pause message """ - msg = MessageBuilder.kill(msg_text) + msg = MsgKill.new(msg_text) self._coordinator.broadcast_send(msg, subject=Intent.KILL) - def notify_msg(self, msg: MessageType, sender: Hashable | None = None, subject: str | None = None) -> None: + def notify_msg(self, msg: Message, sender: Hashable | None = None, subject: str | None = None) -> None: """ Notify all processes by broadcasting of a msg @@ -325,7 +328,7 @@ def notify_msg(self, msg: MessageType, sender: Hashable | None = None, subject: def continue_process( self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False ) -> Union[None, PID_TYPE, ProcessResult]: - message = create_continue_body(pid=pid, tag=tag, nowait=nowait) + message = MsgContinue.new(pid=pid, tag=tag, nowait=nowait) return self._coordinator.task_send(message, no_reply=no_reply) def launch_process( @@ -350,7 +353,7 @@ def launch_process( :param no_reply: don't send a reply to the sender :return: the pid of the created process or the outputs (if nowait=False) """ - message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) + message = MsgLaunch.new(process_class, init_args, init_kwargs, persist, loader, nowait) return self._coordinator.task_send(message, no_reply=no_reply) def execute_process( @@ -375,7 +378,7 @@ def execute_process( :param no_reply: if True, this call will be fire-and-forget, i.e. no return value :return: the result of executing the process """ - message = create_create_body(process_class, init_args, init_kwargs, persist=True, loader=loader) + message = MsgCreate.new(process_class, init_args, init_kwargs, persist=True, loader=loader) execute_future = kiwipy.Future() create_future = self._coordinator.task_send(message) diff --git a/tests/test_message.py b/tests/test_message.py index 0a6ee96c..dd42c137 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -2,7 +2,7 @@ import pytest import plumpy -from plumpy import message +from plumpy.message import MsgContinue, TASK_ARGS from tests import utils @@ -37,7 +37,7 @@ async def test_continue(): del process process = None - result = await launcher._continue(**plumpy.create_continue_body(pid)[message.TASK_ARGS]) + result = await launcher._continue(**MsgContinue.new(pid)[TASK_ARGS]) assert result == utils.DummyProcess.EXPECTED_OUTPUTS @@ -50,6 +50,6 @@ async def test_loader_is_used(): persister.save_checkpoint(proc) launcher = plumpy.ProcessLauncher(persister=persister, loader=loader) - continue_task = plumpy.create_continue_body(proc.pid) - result = await launcher._continue(**continue_task[message.TASK_ARGS]) + continue_task = MsgContinue.new(proc.pid) + result = await launcher._continue(**continue_task[TASK_ARGS]) assert result == utils.DummyProcess.EXPECTED_OUTPUTS diff --git a/tests/test_process_states.py b/tests/test_process_states.py index 07f41634..be142b59 100644 --- a/tests/test_process_states.py +++ b/tests/test_process_states.py @@ -4,7 +4,7 @@ from plumpy import process_states from plumpy.base.state_machine import StateMachine -from plumpy.message import MessageBuilder +from plumpy.message import MsgKill from plumpy.persistence import LoadSaveContext, Savable, load from plumpy.process_states import Command, Created, Excepted, Finished, Killed, Running, Waiting from plumpy.processes import Process @@ -84,7 +84,7 @@ def test_finished_savable(): def test_killed_savable(): - state = Killed(msg=MessageBuilder.kill('kill it')) + state = Killed(msg=MsgKill.new('kill it')) assert isinstance(state, Savable) saved_state = state.save() diff --git a/tests/test_processes.py b/tests/test_processes.py index 19544ae3..767442e4 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -9,7 +9,7 @@ import plumpy from plumpy import BundleKeys, Process, ProcessState -from plumpy.message import MESSAGE_TEXT_KEY, MessageBuilder +from plumpy.message import MESSAGE_TEXT_KEY, MsgKill from plumpy.persistence import Savable from plumpy.utils import AttributesFrozendict from . import utils @@ -435,7 +435,7 @@ class KillProcess(Process): after_kill = False def run(self, **kwargs): - msg = MessageBuilder.kill(text='killed') + msg = MsgKill.new(text='killed') self.kill(msg) # The following line should be executed because kill will not # interrupt execution of a method call in the RUNNING state @@ -577,7 +577,6 @@ def run(self): assert proc.state_label == plumpy.ProcessState.FINISHED def test_process_stack(self): - class StackTest(plumpy.Process): def run(self): assert self is Process.current() diff --git a/tests/utils.py b/tests/utils.py index a19563e1..2d19e333 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,7 +11,7 @@ import plumpy from plumpy import persistence, process_states, processes, utils from plumpy.exceptions import CoordinatorConnectionError -from plumpy.message import MessageBuilder +from plumpy.message import MsgKill from plumpy.rmq import TaskRejected import shortuuid @@ -208,7 +208,7 @@ def last_step(self): class KillProcess(processes.Process): @utils.override def run(self): - msg = MessageBuilder.kill(text='killed') + msg = MsgKill.new(text='killed') return process_states.Kill(msg=msg)