Skip to content

Commit

Permalink
updates_do_not_block_continue_as_new (#584)
Browse files Browse the repository at this point in the history
* Test update doesn't block CAN and is handled on next run
  • Loading branch information
dandavison authored Jan 23, 2025
1 parent 214b0a6 commit 45af634
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
# signals_block_completion
# signals_block_continue_as_new

Workflows are subject to a maximum history size. As a result, very long running or
infinite lifetime workflows require some mechanism to avoid running into this limit.
Continue as new provides that facility by allowing a workflow to complete and pass
state to a new execution of the same workflow.


# Detailed spec

* Workflows may choose to continue as new at any point. Semantically, this is best thought
of as the workflow returning with a special value indicating it would like to continue.
However, many SDKs choose to implement this as a free-floating API that may be called anywhere
in workflow code, or signal handlers, etc.
* When that happens, the next WFT should have a ContinueAsNewWorkflowExecution command
* If (this is the case for any execution-completing command) the server has received new signals
for the workflow while the WFT was being processed, the WFT must be retried.
* When that happens, the next WFT response should have a ContinueAsNewWorkflowExecution command
* If the server has received new signals for the workflow while the WFT was being processed, the WFT must be
retried. Note that this is the case for any execution-completing command; not just continue as new.
* Users should be aware that they may want to ensure signal channels are drained before
continuing as new, if the language (Go) doesn't use explicit handlers.
continuing as new, if the language (Go) doesn't use explicit handlers.


# Detailed spec

* The client starts a workflow that will continue as new (CAN).
* The client sends a signal in such a way that it is guaranteed that it is made durable by the server while the WFT is in flight.
* The workflow responds to the WFT with a ContinueAsNewWorkflowExecution command.
* Verify that the WFT is retried and that the signal is handled on the pre-CAN run.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# updates_do_not_block_continue_as_new

Workflows are subject to a maximum history size. As a result, very long running or
infinite lifetime workflows require some mechanism to avoid running into this limit.
Continue as new provides that facility by allowing a workflow to complete and pass
state to a new execution of the same workflow.

# Detailed spec

* The client starts a workflow that will continue as new (CAN).
* The client sends an update in such a way that it is guaranteed that it is admitted by the server while the WFT is in flight.
* The workflow responds to the WFT with a ContinueAsNewWorkflowExecution command.
* The workflow handles the update in a way that returns information to the caller about the run on which it was handled.
* Verify that the update was handled on the post-CAN run.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
In this test the client code sends an update, guaranteeing that the workflow worker is
processing a workflow task (WFT) at the time that the update is admitted by the server. To
do this it must synchronize the workflow and client. This uses techniques that should
never be used in real workflows. The synchronization must be threading-based as opposed to
asyncio-based, since the point is to not allow the workflow to yield while it is waiting
for notification from the client. In order for the workflow and client to share the same
module namespace, we use UnsandboxedWorkflowRunner. But this means that the workflow and
client code execute in the same thread. Therefore we do the client's thread-blocking
synchronization calls in a new thread, via asyncio.to_thread, so that both client and
workflow can use thread-blocking waits on the shared threading.Event object.
"""

import asyncio
import threading
from datetime import timedelta
from uuid import uuid4

from temporalio import workflow
from temporalio.api.enums.v1 import EventType
from temporalio.client import WorkflowHandle
from temporalio.worker import UnsandboxedWorkflowRunner, WorkerConfig

from harness.python.feature import Runner, register_feature
from harness.python.util import admitted_update_task

# See docstring at top of file.
first_run_wft_is_in_progress = threading.Event()
update_has_been_admitted = threading.Event()


@workflow.defn
class Workflow:
def __init__(self):
self.received_update = False

@workflow.run
async def run(self) -> str:
"""
Continue as new once, then return the current run ID.
"""
if not first_run_wft_is_in_progress.is_set():
# Note: you should usually never block the thread in workflow code.
# See docstring at top of file.
first_run_wft_is_in_progress.set()
update_has_been_admitted.wait()

info = workflow.info()
if info.continued_run_id is not None:
# The update is probably delivered in the first post-CAN WFT, in which case
# the following wait_condition is not needed. However, correct behavior does
# not require this to be true.
await workflow.wait_condition(lambda: self.received_update)
return info.run_id

workflow.continue_as_new()

@workflow.update
async def update(self) -> str:
"""Update handler that returns the current run ID"""
self.received_update = True
return workflow.info().run_id


async def start(runner: Runner) -> WorkflowHandle:
return await runner.client.start_workflow(
Workflow.run,
id=str(uuid4()),
task_queue=runner.task_queue,
execution_timeout=timedelta(minutes=1),
)


async def check_result(runner: Runner, handle: WorkflowHandle) -> None:
# See docstring at top of file.
# Cause an update to be admitted while the first WFT is in progress
await asyncio.to_thread(first_run_wft_is_in_progress.wait)
# The workflow is now blocking its thread waiting for the update to be admitted
update_task = await admitted_update_task(
runner.client, handle, Workflow.update, "update-id"
)
# Unblock the workflow so that it responds to the WFT with a CAN command.
update_has_been_admitted.set()
# The workflow will now CAN. Wait for the update result
update_run_id = await update_task

# The update should have been handled on the post-CAN run.
assert (
handle.first_execution_run_id
and update_run_id
and update_run_id != handle.first_execution_run_id
), "Expected update to be handled on post-CAN run"

update_event_types = {
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED,
}

async def get_event_types(run_id: str) -> set[EventType.ValueType]:
return {
event.event_type
for event in (
await runner.client.get_workflow_handle(
handle.id, run_id=run_id
).fetch_history()
).events
}

assert not (
update_event_types & await get_event_types(handle.first_execution_run_id)
), "Update should not appear in pre-CAN history"

assert update_event_types <= await get_event_types(
update_run_id
), "Update events should appear in post-CAN history"


register_feature(
workflows=[Workflow],
start=start,
check_result=check_result,
# Disable sandbox in order to allow this test to "cheat" by sharing state between the
# client and the workflow.
worker_config=WorkerConfig(workflow_runner=UnsandboxedWorkflowRunner()),
)
1 change: 1 addition & 0 deletions harness/python/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(
if tls_config is not None:
self.tls_config = tls_config
self.http_proxy_url = http_proxy_url
self.client: Client

async def run(self) -> None:
logger.info("Executing feature %s", self.feature.rel_dir)
Expand Down
76 changes: 76 additions & 0 deletions harness/python/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio
import time
from datetime import timedelta
from typing import Awaitable, Callable, TypeVar

from temporalio.api.common.v1 import WorkflowExecution
from temporalio.api.update.v1 import UpdateRef
from temporalio.api.workflowservice.v1 import PollWorkflowExecutionUpdateRequest
from temporalio.client import Client, WorkflowHandle
from temporalio.service import RPCError, RPCStatusCode
from temporalio.workflow import UpdateMethodMultiParam

# The update utilities below are copied from
# https://github.com/temporalio/sdk-python/blob/main/tests/helpers/__init__.py


async def admitted_update_task(
client: Client,
handle: WorkflowHandle,
update_method: UpdateMethodMultiParam,
id: str,
**kwargs,
) -> asyncio.Task:
"""
Return an asyncio.Task for an update after waiting for it to be admitted.
"""
update_task = asyncio.create_task(
handle.execute_update(update_method, id=id, **kwargs)
)
await assert_eq_eventually(
True,
lambda: workflow_update_has_been_admitted(client, handle.id, id),
)
return update_task


async def workflow_update_has_been_admitted(
client: Client, workflow_id: str, update_id: str
) -> bool:
try:
await client.workflow_service.poll_workflow_execution_update(
PollWorkflowExecutionUpdateRequest(
namespace=client.namespace,
update_ref=UpdateRef(
workflow_execution=WorkflowExecution(workflow_id=workflow_id),
update_id=update_id,
),
)
)
return True
except RPCError as err:
if err.status != RPCStatusCode.NOT_FOUND:
raise
return False


T = TypeVar("T")


async def assert_eq_eventually(
expected: T,
fn: Callable[[], Awaitable[T]],
*,
timeout: timedelta = timedelta(seconds=10),
interval: timedelta = timedelta(milliseconds=200),
) -> None:
start_sec = time.monotonic()
last_value = None
while timedelta(seconds=time.monotonic() - start_sec) < timeout:
last_value = await fn()
if expected == last_value:
return
await asyncio.sleep(interval.total_seconds())
assert (
expected == last_value
), f"timed out waiting for equal, asserted against last value of {last_value}"
27 changes: 23 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 45af634

Please sign in to comment.