-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Change minimum Python to 3.9 and add Trio sample (#162)
Fixes #161
- Loading branch information
Showing
12 changed files
with
312 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import uuid | ||
|
||
import trio_asyncio | ||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from trio_async import activities, workflows | ||
|
||
|
||
async def test_workflow_with_trio(client: Client): | ||
@trio_asyncio.aio_as_trio | ||
async def inside_trio(client: Client) -> list[str]: | ||
# Create Trio thread executor | ||
with trio_asyncio.TrioExecutor(max_workers=200) as thread_executor: | ||
task_queue = f"tq-{uuid.uuid4()}" | ||
# Run worker | ||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
activities=[ | ||
activities.say_hello_activity_async, | ||
activities.say_hello_activity_sync, | ||
], | ||
workflows=[workflows.SayHelloWorkflow], | ||
activity_executor=thread_executor, | ||
workflow_task_executor=thread_executor, | ||
): | ||
# Run workflow and return result | ||
return await client.execute_workflow( | ||
workflows.SayHelloWorkflow.run, | ||
"some-user", | ||
id=f"wf-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
|
||
result = trio_asyncio.run(inside_trio, client) | ||
assert result == [ | ||
"Hello, some-user! (from asyncio)", | ||
"Hello, some-user! (from thread)", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Trio Async Sample | ||
|
||
This sample shows how to use Temporal asyncio with [Trio](https://trio.readthedocs.io) using | ||
[Trio asyncio](https://trio-asyncio.readthedocs.io). Specifically it demonstrates using a traditional Temporal client | ||
and worker in a Trio setting, and how Trio-based code can run in both asyncio async activities and threaded sync | ||
activities. | ||
|
||
For this sample, the optional `trio_async` dependency group must be included. To include, run: | ||
|
||
poetry install --with trio_async | ||
|
||
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the | ||
worker: | ||
|
||
poetry run python worker.py | ||
|
||
This will start the worker. Then, in another terminal, run the following to execute the workflow: | ||
|
||
poetry run python starter.py | ||
|
||
The starter should complete with: | ||
|
||
INFO:root:Workflow result: ['Hello, Temporal! (from asyncio)', 'Hello, Temporal! (from thread)'] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import asyncio | ||
import time | ||
|
||
import trio | ||
import trio_asyncio | ||
from temporalio import activity | ||
|
||
|
||
# An asyncio-based async activity | ||
@activity.defn | ||
async def say_hello_activity_async(name: str) -> str: | ||
# Demonstrate a sleep in both asyncio and Trio, showing that both asyncio | ||
# and Trio primitives can be used | ||
|
||
# First asyncio | ||
activity.logger.info("Sleeping in asyncio") | ||
await asyncio.sleep(0.1) | ||
|
||
# Now Trio. We have to invoke the function separately decorated. | ||
# We cannot use the @trio_as_aio decorator on the activity itself because | ||
# it doesn't use functools wrap or similar so it doesn't respond to things | ||
# like __name__ that @activity.defn needs. | ||
return await say_hello_in_trio_from_asyncio(name) | ||
|
||
|
||
@trio_asyncio.trio_as_aio | ||
async def say_hello_in_trio_from_asyncio(name: str) -> str: | ||
activity.logger.info("Sleeping in Trio (from asyncio)") | ||
await trio.sleep(0.1) | ||
return f"Hello, {name}! (from asyncio)" | ||
|
||
|
||
# A thread-based sync activity | ||
@activity.defn | ||
def say_hello_activity_sync(name: str) -> str: | ||
# Demonstrate a sleep in both threaded and Trio, showing that both | ||
# primitives can be used | ||
|
||
# First, thread-blocking | ||
activity.logger.info("Sleeping normally") | ||
time.sleep(0.1) | ||
|
||
# Now Trio. We have to use Trio's thread sync tools to run trio calls from | ||
# a different thread. | ||
return trio.from_thread.run(say_hello_in_trio_from_sync, name) | ||
|
||
|
||
async def say_hello_in_trio_from_sync(name: str) -> str: | ||
activity.logger.info("Sleeping in Trio (from thread)") | ||
await trio.sleep(0.1) | ||
return f"Hello, {name}! (from thread)" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
import logging | ||
|
||
import trio_asyncio | ||
from temporalio.client import Client | ||
|
||
from trio_async import workflows | ||
|
||
|
||
@trio_asyncio.aio_as_trio # Note this decorator which allows asyncio primitives | ||
async def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Execute the workflow | ||
result = await client.execute_workflow( | ||
workflows.SayHelloWorkflow.run, | ||
"Temporal", | ||
id=f"trio-async-workflow-id", | ||
task_queue="trio-async-task-queue", | ||
) | ||
logging.info(f"Workflow result: {result}") | ||
|
||
|
||
if __name__ == "__main__": | ||
# Note how we're using Trio event loop, not asyncio | ||
trio_asyncio.run(main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import asyncio | ||
import logging | ||
import os | ||
import sys | ||
|
||
import trio_asyncio | ||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from trio_async import activities, workflows | ||
|
||
|
||
@trio_asyncio.aio_as_trio # Note this decorator which allows asyncio primitives | ||
async def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Temporal runs threaded activities and workflow tasks via run_in_executor. | ||
# Due to how trio_asyncio works, you can only do run_in_executor with their | ||
# specific executor. We make sure to give it 200 max since we are using it | ||
# for both activities and workflow tasks and by default the worker supports | ||
# 100 max concurrent activity tasks and 100 max concurrent workflow tasks. | ||
with trio_asyncio.TrioExecutor(max_workers=200) as thread_executor: | ||
|
||
# Run a worker for the workflow | ||
async with Worker( | ||
client, | ||
task_queue="trio-async-task-queue", | ||
activities=[ | ||
activities.say_hello_activity_async, | ||
activities.say_hello_activity_sync, | ||
], | ||
workflows=[workflows.SayHelloWorkflow], | ||
activity_executor=thread_executor, | ||
workflow_task_executor=thread_executor, | ||
): | ||
# Wait until interrupted | ||
logging.info("Worker started, ctrl+c to exit") | ||
try: | ||
await asyncio.Future() | ||
except asyncio.CancelledError: | ||
# Ignore, happens on ctrl+C | ||
pass | ||
finally: | ||
logging.info("Shutting down") | ||
|
||
|
||
if __name__ == "__main__": | ||
# Note how we're using Trio event loop, not asyncio | ||
try: | ||
trio_asyncio.run(main) | ||
except KeyboardInterrupt: | ||
# Ignore ctrl+c | ||
pass | ||
except BaseException as err: | ||
# On Python 3.11+ Trio represents keyboard interrupt inside an exception | ||
# group | ||
is_interrupt = ( | ||
sys.version_info >= (3, 11) | ||
and isinstance(err, BaseExceptionGroup) | ||
and err.subgroup(KeyboardInterrupt) | ||
) | ||
if not is_interrupt: | ||
raise |
Oops, something went wrong.