Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow init decorator #645

Merged
merged 17 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._payload_converter = det.payload_converter_class()
self._failure_converter = det.failure_converter_class()
self._defn = det.defn
self._workflow_input: Optional[ExecuteWorkflowInput] = None
self._info = det.info
self._extern_functions = det.extern_functions
self._disable_eager_activity_execution = det.disable_eager_activity_execution
Expand Down Expand Up @@ -318,8 +319,9 @@ def get_thread_id(self) -> Optional[int]:
return self._current_thread_id

#### Activation functions ####
# These are in alphabetical order and besides "activate", all other calls
# are "_apply_" + the job field name.
# These are in alphabetical order and besides "activate", and
# "_make_workflow_input", all other calls are "_apply_" + the job field
# name.

def activate(
self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
Expand All @@ -342,6 +344,7 @@ def activate(
try:
# Split into job sets with patches, then signals + updates, then
# non-queries, then queries
start_job = None
job_sets: List[
List[temporalio.bridge.proto.workflow_activation.WorkflowActivationJob]
] = [[], [], [], []]
Expand All @@ -351,10 +354,15 @@ def activate(
elif job.HasField("signal_workflow") or job.HasField("do_update"):
job_sets[1].append(job)
elif not job.HasField("query_workflow"):
if job.HasField("start_workflow"):
start_job = job.start_workflow
job_sets[2].append(job)
else:
job_sets[3].append(job)

if start_job:
self._workflow_input = self._make_workflow_input(start_job)

# Apply every job set, running after each set
for index, job_set in enumerate(job_sets):
if not job_set:
Expand Down Expand Up @@ -863,34 +871,41 @@ async def run_workflow(input: ExecuteWorkflowInput) -> None:
return
raise

if not self._workflow_input:
raise RuntimeError(
"Expected workflow input to be set. This is an SDK Python bug."
)
self._primary_task = self.create_task(
self._run_top_level_workflow_function(run_workflow(self._workflow_input)),
name="run",
)

def _apply_update_random_seed(
self, job: temporalio.bridge.proto.workflow_activation.UpdateRandomSeed
) -> None:
self._random.seed(job.randomness_seed)

def _make_workflow_input(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there value in separating out this single-use method? I don't mind if just for clarity, just confirming there is not another reason

Copy link
Contributor Author

@dandavison dandavison Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I strongly object to code styles that fail to break out helper functions. It doesn't matter at all to me whether they are used once or multiple times at this point in the codebase. Some reasons (could write much more...):

  • Abstraction: if there are (say) 10 or more lines within a function that create their own independent subset of local function state with the sole aim of computing a value, then it is much easier to read and reason about if that is broken out as a helper function. There is no need for the reader to understand the computation if they don't want to; it's one of the main reasons functions exist. As an extra benefit, the helper function is an opportunity to choose a good descriptive name for the unit of functionality. It also gives a target for unit testing.

  • Even if it were desirable as an alternative to a helper functions (I don't think it is) Python doesn't have block-level scope; it's not possible to determine how the local state interacts with other parts of the function, without reading all the implementation detail closely and mentally tracking scope / lifetime of each variable.

  • Python has a strong and long tradition of being writing as a collection of small functions; I think this is due to its long history of having no static typing; even moreso without static typing, functions have to be small to give readers and writers a chance of avoiding bugs.

https://martinfowler.com/bliki/FunctionLength.html is somewhat similar to my view.

Reasons against:

  • Some people prefer reading large amounts of implementation detail in a single sequential pass and don't mind paying for that by tracking very complex function state in their brains, state that can trivially be decomposed into independent modules.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good, no strong preference. For the same reason people have a hard time reading many lines in a method, they have a hard time reading many methods in a class. I just want to make sure we strike a good balance and not have a ton of small, single-use methods littering the class (as opposed to normal reusable methods).

Copy link
Contributor Author

@dandavison dandavison Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it's the great often forgotten-about religious war in programming :)

For the same reason people have a hard time reading many lines in a method, they have a hard time reading many methods in a class.

I don't agree that there's an equivalence, but I agree that the scope of helper functions has to be tastefully chosen, and that the author has to be very good at choosing function names. With those two things in hand, helper functions basically only make it easier to read. The point is that you can choose how deep into implementation detail you wish to go, whereas without helper functions you cannot choose.

Copy link
Member

@cretz cretz Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For myself, I struggle hopping around amongst disparately placed logic compared to scrolling (but still don't want to scroll too much of course)

Copy link
Contributor Author

@dandavison dandavison Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so the question is: why do you feel the need to hop to the implementation? A helper function has a return type, and a well-chosen name. That should be all that you need to understand the implementation at the call site: you don't need to see the implementation detail of the helper function. Is the issue that you don't trust methods to not mutate instance state? If so, that implies that you'd be happier with helper functions (with immutable parameters), just not so much with methods (in Python).

But the issue of mutating state is where helper functions bring a big advantage: if you include unrolled implementation logic at the call site then in most languages it's very hard to tell whether the implementation is mutating local state, whereas a helper function makes this clearer or even guaranteed (i.e. in Python, pass immutable arguments; in other languages pass non-writable references, etc).

In general, IMO, complex logic should be written as a sequence of function calls. For example, this should be preferred over an unrolled version, even if the functions are called once only.

    artifact_globs = cli.options.artifact_globs or ["*"]
    remote_artifacts = _list_remote_artifacts(repos, artifact_globs)
    artifacts_to_download = _get_artifacts_not_in_db(db, remote_artifacts)
    downloaded_artifacts = _download_zip_artifacts(artifacts_to_download)
    rows = _parse_xml_from_zip_artifacts(downloaded_artifacts)
    rows = _fetch_pr_info(async_iterator_to_list(rows))
    db.insert_rows(rows)

self, start_job: temporalio.bridge.proto.workflow_activation.StartWorkflow
) -> ExecuteWorkflowInput:
# Set arg types, using raw values for dynamic
arg_types = self._defn.arg_types
if not self._defn.name:
# Dynamic is just the raw value for each input value
arg_types = [temporalio.common.RawValue] * len(job.arguments)
args = self._convert_payloads(job.arguments, arg_types)
arg_types = [temporalio.common.RawValue] * len(start_job.arguments)
args = self._convert_payloads(start_job.arguments, arg_types)
# Put args in a list if dynamic
if not self._defn.name:
args = [args]

# Schedule it
input = ExecuteWorkflowInput(
return ExecuteWorkflowInput(
type=self._defn.cls,
# TODO(cretz): Remove cast when https://github.com/python/mypy/issues/5485 fixed
run_fn=cast(Callable[..., Awaitable[Any]], self._defn.run_fn),
args=args,
headers=job.headers,
)
self._primary_task = self.create_task(
self._run_top_level_workflow_function(run_workflow(input)),
name="run",
headers=start_job.headers,
)

def _apply_update_random_seed(
self, job: temporalio.bridge.proto.workflow_activation.UpdateRandomSeed
) -> None:
self._random.seed(job.randomness_seed)

#### _Runtime direct workflow call overrides ####
# These are in alphabetical order and all start with "workflow_".

Expand Down Expand Up @@ -1617,6 +1632,14 @@ def _convert_payloads(
except Exception as err:
raise RuntimeError("Failed decoding arguments") from err

def _instantiate_workflow_object(self) -> Any:
if not self._workflow_input:
raise RuntimeError("Expected workflow input. This is a Python SDK bug.")
if hasattr(self._defn.cls.__init__, "__temporal_workflow_init"):
return self._defn.cls(*self._workflow_input.args)
else:
return self._defn.cls()

def _is_workflow_failure_exception(self, err: BaseException) -> bool:
# An exception is a failure instead of a task fail if it's already a
# failure error or if it is an instance of any of the failure types in
Expand Down Expand Up @@ -1752,7 +1775,7 @@ def _run_once(self, *, check_conditions: bool) -> None:
# We instantiate the workflow class _inside_ here because __init__
# needs to run with this event loop set
if not self._object:
self._object = self._defn.cls()
self._object = self._instantiate_workflow_object()

# Run while there is anything ready
while self._ready:
Expand Down
66 changes: 57 additions & 9 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,38 @@ def decorator(cls: ClassType) -> ClassType:
return decorator


def init(
init_fn: CallableType,
) -> CallableType:
"""Decorator for the workflow init method.

This may be used on the __init__ method of the workflow class to specify
that it accepts the same workflow input arguments as the ``@workflow.run``
method. It may not be used on any other method.

If used, the workflow will be instantiated as
``MyWorkflow(**workflow_input_args)``. If not used, the workflow will be
instantiated as ``MyWorkflow()``.

Note that the ``@workflow.run`` method is always called as
``my_workflow.my_run_method(**workflow_input_args)``. Therefore, if you use
the ``@workflow.init`` decorator, the parameter list of your __init__ and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameter list of your init and @workflow.run methods will usually be identical.

I think we should require/enforce the params are identical if this decorator is present and the run method is present

Copy link
Contributor Author

@dandavison dandavison Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Yes, I was tentatively thinking of not having a contract without any static (decorator-time) constraints:

If you use the decorator, then it is your responsibility to ensure that your init signature is such that it will accept the arguments passed at workflow-run time. As long as that is true, the parameter signatures of the two methods can be whatever Python allows.

But I can't think of a clear counter-example where the stricter contract would make desirable code invalid, whereas enforcing identical params at decorator-eval time has the benefit of catching incorrect code at import time, and also that it can be relaxed in the future if we want to (whereas the looser contract cannot easily be made stricter).

I do think we want to allow names to differ, e.g. for this scenario where an underscore (perhaps required by a linter or styleguide) is used to mark an unused parameter.

@workflow.defn
class MyWorkflow:
    @workflow.init
    def __init__(self, arg: str) -> None:
        self.value = arg

    @workflow.run
    async def run(self, _arg: str) -> str:
        return self.value

d8b5dff

``@workflow.run`` methods will usually be identical.

Args:
init_fn: The __init__function to decorate.
"""
if init_fn.__name__ != "__init__":
raise ValueError("@workflow.init may only be used on the __init__ method")

setattr(init_fn, "__temporal_workflow_init", True)
return init_fn


def run(fn: CallableAsyncType) -> CallableAsyncType:
"""Decorator for the workflow run method.

This must be set on one and only one async method defined on the same class
This must be used on one and only one async method defined on the same class
as ``@workflow.defn``. This can be defined on a base class method but must
then be explicitly overridden and defined on the workflow class.

Expand Down Expand Up @@ -238,7 +266,7 @@ def signal(
):
"""Decorator for a workflow signal method.

This is set on any async or non-async method that you wish to be called upon
This is used on any async or non-async method that you wish to be called upon
receiving a signal. If a function overrides one with this decorator, it too
must be decorated.

Expand Down Expand Up @@ -309,7 +337,7 @@ def query(
):
"""Decorator for a workflow query method.

This is set on any non-async method that expects to handle a query. If a
This is used on any non-async method that expects to handle a query. If a
function overrides one with this decorator, it too must be decorated.

Query methods can only have positional parameters. Best practice for
Expand Down Expand Up @@ -983,7 +1011,7 @@ def update(
):
"""Decorator for a workflow update handler method.

This is set on any async or non-async method that you wish to be called upon
This is used on any async or non-async method that you wish to be called upon
receiving an update. If a function overrides one with this decorator, it too
must be decorated.

Expand Down Expand Up @@ -1307,13 +1335,13 @@ def _apply_to_class(
issues: List[str] = []

# Collect run fn and all signal/query/update fns
members = inspect.getmembers(cls)
init_fn: Optional[Callable[..., None]] = None
run_fn: Optional[Callable[..., Awaitable[Any]]] = None
seen_run_attr = False
signals: Dict[Optional[str], _SignalDefinition] = {}
queries: Dict[Optional[str], _QueryDefinition] = {}
updates: Dict[Optional[str], _UpdateDefinition] = {}
for name, member in members:
for name, member in inspect.getmembers(cls):
if hasattr(member, "__temporal_workflow_run"):
seen_run_attr = True
if not _is_unbound_method_on_cls(member, cls):
Expand Down Expand Up @@ -1354,6 +1382,8 @@ def _apply_to_class(
)
else:
queries[query_defn.name] = query_defn
elif name == "__init__" and hasattr(member, "__temporal_workflow_init"):
init_fn = member
elif isinstance(member, UpdateMethodMultiParam):
update_defn = member._defn
if update_defn.name in updates:
Expand Down Expand Up @@ -1406,9 +1436,14 @@ def _apply_to_class(

if not seen_run_attr:
issues.append("Missing @workflow.run method")
if len(issues) == 1:
raise ValueError(f"Invalid workflow class: {issues[0]}")
elif issues:
if init_fn and run_fn:
if not _parameters_identical_up_to_naming(init_fn, run_fn):
issues.append(
"@workflow.init and @workflow.run method parameters do not match"
)
if issues:
if len(issues) == 1:
raise ValueError(f"Invalid workflow class: {issues[0]}")
raise ValueError(
f"Invalid workflow class for {len(issues)} reasons: {', '.join(issues)}"
)
Expand Down Expand Up @@ -1444,6 +1479,19 @@ def __post_init__(self) -> None:
object.__setattr__(self, "ret_type", ret_type)


def _parameters_identical_up_to_naming(fn1: Callable, fn2: Callable) -> bool:
"""Return True if the functions have identical parameter lists, ignoring parameter names."""

def params(fn: Callable) -> List[inspect.Parameter]:
# Ignore name when comparing parameters (remaining fields are kind,
# default, and annotation).
return [p.replace(name="x") for p in inspect.signature(fn).parameters.values()]

# We require that any type annotations present match exactly; i.e. we do
# not support any notion of subtype compatibility.
return params(fn1) == params(fn2)


# Async safe version of partial
def _bind_method(obj: Any, fn: Callable[..., Any]) -> Callable[..., Any]:
# Curry instance on the definition function since that represents an
Expand Down
118 changes: 116 additions & 2 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3902,7 +3902,7 @@ def matches_metric_line(
return False
# Must have labels (don't escape for this test)
for k, v in at_least_labels.items():
if not f'{k}="{v}"' in line:
if f'{k}="{v}"' not in line:
return False
return line.endswith(f" {value}")

Expand Down Expand Up @@ -4856,7 +4856,7 @@ async def assert_scenario(
update_scenario: Optional[FailureTypesScenario] = None,
) -> None:
logging.debug(
f"Asserting scenario %s",
"Asserting scenario %s",
{
"workflow": workflow,
"expect_task_fail": expect_task_fail,
Expand Down Expand Up @@ -6032,3 +6032,117 @@ async def test_activity_retry_delay(client: Client):
err.cause.cause.next_retry_delay
== ActivitiesWithRetryDelayWorkflow.next_retry_delay
)


@workflow.defn
class WorkflowWithoutInit:
value = "from class attribute"
_expected_update_result = "from class attribute"

@workflow.update
async def my_update(self) -> str:
return self.value

@workflow.run
async def run(self, _: str) -> str:
self.value = "set in run method"
return self.value


@workflow.defn
class WorkflowWithWorkflowInit:
_expected_update_result = "workflow input value"

@workflow.init
def __init__(self, arg: str) -> None:
self.value = arg

@workflow.update
async def my_update(self) -> str:
return self.value

@workflow.run
async def run(self, _: str) -> str:
self.value = "set in run method"
return self.value


@workflow.defn
class WorkflowWithNonWorkflowInitInit:
_expected_update_result = "from parameter default"

def __init__(self, arg: str = "from parameter default") -> None:
self.value = arg

@workflow.update
async def my_update(self) -> str:
return self.value

@workflow.run
async def run(self, _: str) -> str:
self.value = "set in run method"
return self.value


@pytest.mark.parametrize(
["client_cls", "worker_cls"],
[
(WorkflowWithoutInit, WorkflowWithoutInit),
(WorkflowWithNonWorkflowInitInit, WorkflowWithNonWorkflowInitInit),
(WorkflowWithWorkflowInit, WorkflowWithWorkflowInit),
],
)
async def test_update_in_first_wft_sees_workflow_init(
client: Client, client_cls: Type, worker_cls: Type
):
"""
Test how @workflow.init affects what an update in the first WFT sees.

Such an update is guaranteed to start executing before the main workflow
coroutine. The update should see the side effects of the __init__ method if
and only if @workflow.init is in effect.
"""
# This test must ensure that the update is in the first WFT. To do so,
# before running the worker, we start the workflow, send the update, and
# wait until the update is admitted.
task_queue = "task-queue"
update_id = "update-id"
wf_handle = await client.start_workflow(
client_cls.run,
"workflow input value",
id=str(uuid.uuid4()),
task_queue=task_queue,
)
update_task = asyncio.create_task(
wf_handle.execute_update(client_cls.my_update, id=update_id)
)
await assert_eq_eventually(
True, lambda: workflow_update_exists(client, wf_handle.id, update_id)
)
# When the worker starts polling it will receive a first WFT containing the
# update, in addition to the start_workflow job.
async with new_worker(client, worker_cls, task_queue=task_queue):
assert await update_task == worker_cls._expected_update_result
assert await wf_handle.result() == "set in run method"


@workflow.defn
class WorkflowRunSeesWorkflowInitWorkflow:
@workflow.init
def __init__(self, arg: str) -> None:
self.value = arg

@workflow.run
async def run(self, _: str):
return f"hello, {self.value}"


async def test_workflow_run_sees_workflow_init(client: Client):
async with new_worker(client, WorkflowRunSeesWorkflowInitWorkflow) as worker:
workflow_result = await client.execute_workflow(
WorkflowRunSeesWorkflowInitWorkflow.run,
"world",
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
)
assert workflow_result == "hello, world"
Loading