diff --git a/src/plumpy/message.py b/src/plumpy/message.py index 26cad8c5..9f27f16a 100644 --- a/src/plumpy/message.py +++ b/src/plumpy/message.py @@ -86,86 +86,98 @@ def status(cls, text: str | None = None) -> MessageType: MESSAGE_TEXT_KEY: text, } + @classmethod + def launch( + cls, + process_class: str, + init_args: Sequence[Any] | None = None, + init_kwargs: dict[str, Any] | None = None, + persist: bool = False, + loader: loaders.ObjectLoader | None = None, + nowait: bool = True, + ) -> dict[str, Any]: + """ + Create a message body for the launch action + + :param process_class: the class of the process to launch + :param init_args: any initialisation positional arguments + :param init_kwargs: any initialisation keyword arguments + :param persist: persist this process if True, otherwise don't + :param loader: the loader to use to load the persisted process + :param nowait: wait for the process to finish before completing the task, otherwise just return the PID + :return: a dictionary with the body of the message to launch the process + :rtype: dict + """ + if loader is None: + loader = loaders.get_object_loader() -def create_launch_body( - process_class: str, - init_args: Sequence[Any] | None = None, - init_kwargs: dict[str, Any] | None = None, - persist: bool = False, - loader: loaders.ObjectLoader | None = None, - nowait: bool = True, -) -> dict[str, Any]: - """ - Create a message body for the launch action - - :param process_class: the class of the process to launch - :param init_args: any initialisation positional arguments - :param init_kwargs: any initialisation keyword arguments - :param persist: persist this process if True, otherwise don't - :param loader: the loader to use to load the persisted process - :param nowait: wait for the process to finish before completing the task, otherwise just return the PID - :return: a dictionary with the body of the message to launch the process - :rtype: dict - """ - if loader is None: - loader = loaders.get_object_loader() - - msg_body = { - TASK_KEY: LAUNCH_TASK, - TASK_ARGS: { - PROCESS_CLASS_KEY: loader.identify_object(process_class), - PERSIST_KEY: persist, - NOWAIT_KEY: nowait, - ARGS_KEY: init_args, - KWARGS_KEY: init_kwargs, - }, - } - return msg_body - - -def create_continue_body(pid: 'PID_TYPE', tag: str | None = None, nowait: bool = False) -> dict[str, Any]: - """ - Create a message body to continue an existing process - :param pid: the pid of the existing process - :param tag: the optional persistence tag - :param nowait: wait for the process to finish before completing the task, otherwise just return the PID - :return: a dictionary with the body of the message to continue the process + return { + TASK_KEY: LAUNCH_TASK, + TASK_ARGS: { + PROCESS_CLASS_KEY: loader.identify_object(process_class), + PERSIST_KEY: persist, + NOWAIT_KEY: nowait, + ARGS_KEY: init_args, + KWARGS_KEY: init_kwargs, + }, + } - """ - msg_body = {TASK_KEY: CONTINUE_TASK, TASK_ARGS: {PID_KEY: pid, NOWAIT_KEY: nowait, TAG_KEY: tag}} - return msg_body + @classmethod + def continue_( + cls, + pid: 'PID_TYPE', + tag: str | None = None, + nowait: bool = False, + ) -> dict[str, Any]: + """ + Create a message body to continue an existing process + :param pid: the pid of the existing process + :param tag: the optional persistence tag + :param nowait: wait for the process to finish before completing the task, otherwise just return the PID + :return: a dictionary with the body of the message to continue the process + """ + return { + TASK_KEY: CONTINUE_TASK, + TASK_ARGS: { + PID_KEY: pid, + NOWAIT_KEY: nowait, + TAG_KEY: tag, + }, + } -def create_create_body( - process_class: str, - init_args: Sequence[Any] | None = None, - init_kwargs: dict[str, Any] | None = None, - persist: bool = False, - loader: loaders.ObjectLoader | None = None, -) -> dict[str, Any]: - """ - Create a message body to create a new process - :param process_class: the class of the process to launch - :param init_args: any initialisation positional arguments - :param init_kwargs: any initialisation keyword arguments - :param persist: persist this process if True, otherwise don't - :param loader: the loader to use to load the persisted process - :return: a dictionary with the body of the message to launch the process + @classmethod + def create( + cls, + process_class: str, + init_args: Sequence[Any] | None = None, + init_kwargs: dict[str, Any] | None = None, + persist: bool = False, + loader: loaders.ObjectLoader | None = None, + ) -> dict[str, Any]: + """ + Create a message body to create a new process + :param process_class: the class of the process to launch + :param init_args: any initialisation positional arguments + :param init_kwargs: any initialisation keyword arguments + :param persist: persist this process if True, otherwise don't + :param loader: the loader to use to load the persisted process + :return: a dictionary with the body of the message to launch the process - """ - if loader is None: - loader = loaders.get_object_loader() - - msg_body = { - TASK_KEY: CREATE_TASK, - TASK_ARGS: { - PROCESS_CLASS_KEY: loader.identify_object(process_class), - PERSIST_KEY: persist, - ARGS_KEY: init_args, - KWARGS_KEY: init_kwargs, - }, - } - return msg_body + """ + if loader is None: + loader = loaders.get_object_loader() + + msg_body = { + TASK_KEY: CREATE_TASK, + TASK_ARGS: { + PROCESS_CLASS_KEY: loader.identify_object(process_class), + PERSIST_KEY: persist, + ARGS_KEY: init_args, + KWARGS_KEY: init_kwargs, + }, + } + return msg_body class ProcessLauncher: