Skip to content

Commit

Permalink
Create/Launch/Continue body into builder
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Jan 30, 2025
1 parent 4edd4df commit 013e6d9
Showing 1 changed file with 87 additions and 75 deletions.
162 changes: 87 additions & 75 deletions src/plumpy/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,86 +86,98 @@ def status(cls, text: str | None = None) -> MessageType:
MESSAGE_TEXT_KEY: text,
}

@classmethod
def launch(
cls,
process_class: str,
init_args: Sequence[Any] | None = None,
init_kwargs: dict[str, Any] | None = None,
persist: bool = False,
loader: loaders.ObjectLoader | None = None,
nowait: bool = True,
) -> dict[str, Any]:
"""
Create a message body for the launch action
:param process_class: the class of the process to launch
:param init_args: any initialisation positional arguments
:param init_kwargs: any initialisation keyword arguments
:param persist: persist this process if True, otherwise don't
:param loader: the loader to use to load the persisted process
:param nowait: wait for the process to finish before completing the task, otherwise just return the PID
:return: a dictionary with the body of the message to launch the process
:rtype: dict
"""
if loader is None:
loader = loaders.get_object_loader()

def create_launch_body(
process_class: str,
init_args: Sequence[Any] | None = None,
init_kwargs: dict[str, Any] | None = None,
persist: bool = False,
loader: loaders.ObjectLoader | None = None,
nowait: bool = True,
) -> dict[str, Any]:
"""
Create a message body for the launch action
:param process_class: the class of the process to launch
:param init_args: any initialisation positional arguments
:param init_kwargs: any initialisation keyword arguments
:param persist: persist this process if True, otherwise don't
:param loader: the loader to use to load the persisted process
:param nowait: wait for the process to finish before completing the task, otherwise just return the PID
:return: a dictionary with the body of the message to launch the process
:rtype: dict
"""
if loader is None:
loader = loaders.get_object_loader()

msg_body = {
TASK_KEY: LAUNCH_TASK,
TASK_ARGS: {
PROCESS_CLASS_KEY: loader.identify_object(process_class),
PERSIST_KEY: persist,
NOWAIT_KEY: nowait,
ARGS_KEY: init_args,
KWARGS_KEY: init_kwargs,
},
}
return msg_body


def create_continue_body(pid: 'PID_TYPE', tag: str | None = None, nowait: bool = False) -> dict[str, Any]:
"""
Create a message body to continue an existing process
:param pid: the pid of the existing process
:param tag: the optional persistence tag
:param nowait: wait for the process to finish before completing the task, otherwise just return the PID
:return: a dictionary with the body of the message to continue the process
return {
TASK_KEY: LAUNCH_TASK,
TASK_ARGS: {
PROCESS_CLASS_KEY: loader.identify_object(process_class),
PERSIST_KEY: persist,
NOWAIT_KEY: nowait,
ARGS_KEY: init_args,
KWARGS_KEY: init_kwargs,
},
}

"""
msg_body = {TASK_KEY: CONTINUE_TASK, TASK_ARGS: {PID_KEY: pid, NOWAIT_KEY: nowait, TAG_KEY: tag}}
return msg_body
@classmethod
def continue_(
cls,
pid: 'PID_TYPE',
tag: str | None = None,
nowait: bool = False,
) -> dict[str, Any]:
"""
Create a message body to continue an existing process
:param pid: the pid of the existing process
:param tag: the optional persistence tag
:param nowait: wait for the process to finish before completing the task, otherwise just return the PID
:return: a dictionary with the body of the message to continue the process
"""
return {
TASK_KEY: CONTINUE_TASK,
TASK_ARGS: {
PID_KEY: pid,
NOWAIT_KEY: nowait,
TAG_KEY: tag,
},
}

def create_create_body(
process_class: str,
init_args: Sequence[Any] | None = None,
init_kwargs: dict[str, Any] | None = None,
persist: bool = False,
loader: loaders.ObjectLoader | None = None,
) -> dict[str, Any]:
"""
Create a message body to create a new process
:param process_class: the class of the process to launch
:param init_args: any initialisation positional arguments
:param init_kwargs: any initialisation keyword arguments
:param persist: persist this process if True, otherwise don't
:param loader: the loader to use to load the persisted process
:return: a dictionary with the body of the message to launch the process
@classmethod
def create(
cls,
process_class: str,
init_args: Sequence[Any] | None = None,
init_kwargs: dict[str, Any] | None = None,
persist: bool = False,
loader: loaders.ObjectLoader | None = None,
) -> dict[str, Any]:
"""
Create a message body to create a new process
:param process_class: the class of the process to launch
:param init_args: any initialisation positional arguments
:param init_kwargs: any initialisation keyword arguments
:param persist: persist this process if True, otherwise don't
:param loader: the loader to use to load the persisted process
:return: a dictionary with the body of the message to launch the process
"""
if loader is None:
loader = loaders.get_object_loader()

msg_body = {
TASK_KEY: CREATE_TASK,
TASK_ARGS: {
PROCESS_CLASS_KEY: loader.identify_object(process_class),
PERSIST_KEY: persist,
ARGS_KEY: init_args,
KWARGS_KEY: init_kwargs,
},
}
return msg_body
"""
if loader is None:
loader = loaders.get_object_loader()

msg_body = {
TASK_KEY: CREATE_TASK,
TASK_ARGS: {
PROCESS_CLASS_KEY: loader.identify_object(process_class),
PERSIST_KEY: persist,
ARGS_KEY: init_args,
KWARGS_KEY: init_kwargs,
},
}
return msg_body


class ProcessLauncher:
Expand Down

0 comments on commit 013e6d9

Please sign in to comment.