Skip to content

Commit

Permalink
Add Versioning Intents to Commands (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jul 11, 2023
1 parent d7238cd commit 317dd9b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 1 deletion.
4 changes: 4 additions & 0 deletions temporalio/worker/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import temporalio.api.common.v1
import temporalio.common
import temporalio.workflow
from temporalio.workflow import VersioningIntent


class Interceptor:
Expand Down Expand Up @@ -154,6 +155,7 @@ class ContinueAsNewInput:
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
headers: Mapping[str, temporalio.api.common.v1.Payload]
versioning_intent: Optional[VersioningIntent]
# The types may be absent
arg_types: Optional[List[Type]]

Expand Down Expand Up @@ -226,6 +228,7 @@ class StartActivityInput:
cancellation_type: temporalio.workflow.ActivityCancellationType
headers: Mapping[str, temporalio.api.common.v1.Payload]
disable_eager_execution: bool
versioning_intent: Optional[VersioningIntent]
# The types may be absent
arg_types: Optional[List[Type]]
ret_type: Optional[Type]
Expand All @@ -250,6 +253,7 @@ class StartChildWorkflowInput:
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
headers: Mapping[str, temporalio.api.common.v1.Payload]
versioning_intent: Optional[VersioningIntent]
# The types may be absent
arg_types: Optional[List[Type]]
ret_type: Optional[Type]
Expand Down
14 changes: 14 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ def workflow_continue_as_new(
retry_policy: Optional[temporalio.common.RetryPolicy],
memo: Optional[Mapping[str, Any]],
search_attributes: Optional[temporalio.common.SearchAttributes],
versioning_intent: Optional[temporalio.workflow.VersioningIntent],
) -> NoReturn:
# Use definition if callable
name: Optional[str] = None
Expand All @@ -694,6 +695,7 @@ def workflow_continue_as_new(
search_attributes=search_attributes,
headers={},
arg_types=arg_types,
versioning_intent=versioning_intent,
)
)
# TODO(cretz): Why can't MyPy infer the above never returns?
Expand Down Expand Up @@ -820,6 +822,7 @@ def workflow_start_activity(
retry_policy: Optional[temporalio.common.RetryPolicy],
cancellation_type: temporalio.workflow.ActivityCancellationType,
activity_id: Optional[str],
versioning_intent: Optional[temporalio.workflow.VersioningIntent],
) -> temporalio.workflow.ActivityHandle[Any]:
# Get activity definition if it's callable
name: str
Expand Down Expand Up @@ -851,6 +854,7 @@ def workflow_start_activity(
disable_eager_execution=self._disable_eager_activity_execution,
arg_types=arg_types,
ret_type=ret_type,
versioning_intent=versioning_intent,
)
)

Expand All @@ -871,6 +875,7 @@ async def workflow_start_child_workflow(
cron_schedule: str,
memo: Optional[Mapping[str, Any]],
search_attributes: Optional[temporalio.common.SearchAttributes],
versioning_intent: Optional[temporalio.workflow.VersioningIntent],
) -> temporalio.workflow.ChildWorkflowHandle[Any, Any]:
# Use definition if callable
name: str
Expand Down Expand Up @@ -905,6 +910,7 @@ async def workflow_start_child_workflow(
headers={},
arg_types=arg_types,
ret_type=ret_type,
versioning_intent=versioning_intent,
)
)

Expand Down Expand Up @@ -1679,6 +1685,10 @@ def _apply_schedule_command(
command.schedule_activity.do_not_eagerly_execute = (
self._input.disable_eager_execution
)
if self._input.versioning_intent:
command.schedule_activity.versioning_intent = (
self._input.versioning_intent._to_proto()
)
if isinstance(self._input, StartLocalActivityInput):
if self._input.local_retry_threshold:
command.schedule_local_activity.local_retry_threshold.FromTimedelta(
Expand Down Expand Up @@ -1810,6 +1820,8 @@ def _apply_start_command(
"temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType",
int(self._input.cancellation_type),
)
if self._input.versioning_intent:
v.versioning_intent = self._input.versioning_intent._to_proto()

# If request cancel external, result does _not_ have seq
def _apply_cancel_command(
Expand Down Expand Up @@ -1907,6 +1919,8 @@ def _apply_command(
_encode_search_attributes(
self._input.search_attributes, v.search_attributes
)
if self._input.versioning_intent:
v.versioning_intent = self._input.versioning_intent._to_proto()


def _encode_search_attributes(
Expand Down
Loading

0 comments on commit 317dd9b

Please sign in to comment.