Skip to content

Commit

Permalink
Python concepts (#456)
Browse files Browse the repository at this point in the history
* Python durable building blocks page

* Python concept services page

* Add invocations page concepts Python

* Python concepts fixes
  • Loading branch information
gvdongen authored Sep 9, 2024
1 parent 43b8d3a commit bc73d3e
Show file tree
Hide file tree
Showing 19 changed files with 568 additions and 17 deletions.
Empty file.
93 changes: 93 additions & 0 deletions code_snippets/python/src/concepts/food_ordering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import typing
import uuid
from datetime import timedelta
from typing import TypedDict
from restate import VirtualObject, ObjectContext

from src.concepts.utils import PaymentClient, Restaurant


class Order(TypedDict):
id: str
total_cost: int
delivery_delay: int


payment_client = PaymentClient()
restaurant = Restaurant()

order_workflow = VirtualObject("OrderWorkflow")


# <start_here>
# <mark_1>
@order_workflow.handler()
async def process(ctx: ObjectContext, order: Order):
# </mark_1>
# 1. Set status
# <mark_4>
ctx.set("status", "CREATED")
# </mark_4>

# 2. Handle payment
# <mark_5>
token = await ctx.run("token", lambda: str(uuid.uuid4()))

async def pay():
await payment_client.charge(order["id"], token, order["total_cost"])

paid = await ctx.run("payment", pay)
# </mark_5>

if not paid:
# <mark_4>
ctx.set("status", "REJECTED")
# </mark_4>
return

# 3. Wait until the requested preparation time
# <mark_4>
ctx.set("status", "SCHEDULED")
# </mark_4>
await ctx.sleep(timedelta(milliseconds=order["delivery_delay"]))

# 4. Trigger preparation
# <mark_3>
preparation_id, preparation_promise = ctx.awakeable()
# <mark_5>
await ctx.run(
"prepare",
lambda: restaurant.prepare(order["id"], preparation_id)
)
# </mark_5>
# </mark_3>
# <mark_4>
ctx.set("status", "IN_PREPARATION")
# </mark_4>
# <mark_3>
await preparation_promise
# </mark_3>

# <mark_4>
ctx.set("status", "SCHEDULING_DELIVERY")
# </mark_4>

# 5. Find a driver and start delivery
# <mark_2>
await ctx.object_call(start_delivery, key=order["id"], arg=order)
# </mark_2>
# <mark_4>
ctx.set("status", "DELIVERED")
# </mark_4>
# <end_here>


delivery_manager = VirtualObject("DeliveryManager")


@delivery_manager.handler()
async def start_delivery(ctx: ObjectContext, order: Order):
ctx.set("status", "SCHEDULED")
await ctx.sleep(timedelta(minutes=5))
ctx.set("status", "DELIVERED")
return True
Empty file.
15 changes: 15 additions & 0 deletions code_snippets/python/src/concepts/invocations/delayed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from datetime import timedelta

from restate import Service, Context

from src.concepts.invocations.utils import greet

my_service = Service("MyService")


# <start_delayed_call>
@my_service.handler()
async def my_handler(ctx: Context, arg):
# focus
ctx.service_send(greet, arg="Hi", send_delay=timedelta(seconds=1))
# <end_delayed_call>
13 changes: 13 additions & 0 deletions code_snippets/python/src/concepts/invocations/one_way.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from restate import Service, Context

from src.concepts.invocations.utils import greet

my_service = Service("MyService")


# <start_one_way_call>
@my_service.handler()
async def my_handler(ctx: Context, arg):
# focus
ctx.service_send(greet, arg="Hi")
# <end_one_way_call>
13 changes: 13 additions & 0 deletions code_snippets/python/src/concepts/invocations/rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from restate import Service, Context

from src.concepts.invocations.utils import greet

my_service = Service("MyService")


# <start_rpc_call>
@my_service.handler()
async def my_handler(ctx: Context, arg):
# focus
greeting = await ctx.service_call(greet, arg="Hi")
# <end_rpc_call>
29 changes: 29 additions & 0 deletions code_snippets/python/src/concepts/invocations/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from restate import Context, Service, VirtualObject, ObjectContext, Workflow, WorkflowContext, WorkflowSharedContext

greeter = Service("Greeter")


@greeter.handler()
async def greet(ctx: Context, greeting: str) -> str:
return f"{greeting}"


greet_counter = VirtualObject("GreetCounter")


@greet_counter.handler()
async def count_greet(ctx: ObjectContext, count: int) -> int:
return 1


my_workflow = Workflow("MyWorkflow")


@my_workflow.main()
async def run(ctx: WorkflowContext, greeting: str):
return


@my_workflow.handler()
async def my_other_handler(ctx: WorkflowSharedContext, req: str) -> str:
return ""
71 changes: 71 additions & 0 deletions code_snippets/python/src/concepts/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import TypedDict

import restate
from restate import Service, Context

from typing import TypedDict, List
import random
import logging


class UserRole(TypedDict):
roleKey: str
roleDescription: str


class Permission(TypedDict):
permissionKey: str
setting: str


class UpdateRequest(TypedDict):
userId: str
role: UserRole
permissions: List[Permission]


def apply_user_role(user_id: str, user_role: UserRole) -> bool:
maybe_crash(0.3)
logging.info(f'>>> Applied role {user_role["roleKey"]} for user {user_id}')
return True


def apply_permission(user_id: str, permission: Permission) -> None:
maybe_crash(0.2)
logging.info(f'>>> Applied permission {permission["permissionKey"]}:{permission["setting"]} for user {user_id}')


def maybe_crash(probability: float) -> None:
if random.random() < probability:
raise Exception("Simulated crash")


# <start_here>
role_update_service = Service("RoleUpdateService")


# <mark_2>
@role_update_service.handler()
async def apply_role_update(ctx: Context, update: UpdateRequest):
# </mark_2>

# <mark_1>
success = await ctx.run("role",
lambda: apply_user_role(update["userId"], update["role"]))
# </mark_1>
# <mark_3>
if not success:
return
# </mark_3>

# <mark_3>
for permission in update["permissions"]:
# </mark_3>
# <mark_1>
await ctx.run("permission",
lambda: apply_permission(update["userId"], permission))
# </mark_1>


app = restate.app([role_update_service])
# <end_here>
14 changes: 14 additions & 0 deletions code_snippets/python/src/concepts/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

class PaymentClient:
async def charge(self, id: str, token: str, amount: int) -> bool:
return True


class EmailClient:
async def send_success_notification(self, mail: str) -> bool:
return True


class Restaurant:
def prepare(self, order_id: str, preparation_id: str):
return True
46 changes: 46 additions & 0 deletions code_snippets/python/src/concepts/virtual_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import restate
from restate import VirtualObject, ObjectContext

# <start_here>
# <mark_1>
greeter = VirtualObject("Greeter")
# </mark_1>


# <mark_1>
# <mark_3>
@greeter.handler()
async def greet(ctx: ObjectContext, greeting: str) -> str:
# </mark_3>
# </mark_1>
# <mark_2>
count = await ctx.get("count") or 0
count += 1
ctx.set("count", count)
# </mark_2>
# <mark_1>
return f"{greeting} {ctx.key} for the {count}-th time."
# </mark_1>


# <mark_1>
# <mark_3>
@greeter.handler()
async def ungreet(ctx: ObjectContext) -> str:
# </mark_3>
# </mark_1>
# <mark_2>
count = await ctx.get("count") or 0
# </mark_2>
if count > 0:
# <mark_2>
count -= 1
ctx.set("count", count)
# </mark_2>
# <mark_1>
return f"Dear {ctx.key}, taking one greeting back: {count}."
# </mark_1>


app = restate.app([greeter])
# <end_here>
72 changes: 72 additions & 0 deletions code_snippets/python/src/concepts/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import TypedDict

import restate
from restate import Workflow, WorkflowContext, WorkflowSharedContext
from restate.exceptions import TerminalError

from src.concepts.utils import PaymentClient, EmailClient


class PaymentRequest(TypedDict):
amount: int
account: str
email: str


class PaymentSuccess(TypedDict):
account: str


payment_client = PaymentClient()
email_client = EmailClient()


# <start_here>
payment_workflow = Workflow("Payment")


# <mark_1>
@payment_workflow.main()
async def run(ctx: WorkflowContext, payment: PaymentRequest):
# Validate payment. If not valid, end workflow right here without retries.
if payment["amount"] < 0:
raise TerminalError("Payment refused: negative amount")

async def pay():
return await payment_client.charge(ctx.key(), payment["account"], payment["amount"])
await ctx.run("make a payment", pay)

# <mark_3>
await ctx.promise("payment.success").value()
# </mark_3>
# <mark_2>
ctx.set("status", "Payment succeeded")
# </mark_2>

async def email():
return await email_client.send_success_notification(payment["email"])
await ctx.run("notify the user", email)

# <mark_2>
ctx.set("status", "User notified of payment success")
# </mark_2>

return "success"
# </mark_1>


# <mark_3>
@payment_workflow.handler()
async def payment_webhook(ctx: WorkflowSharedContext, account: str):
await ctx.promise("payment.success").resolve(account)
# </mark_3>


# <mark_2>
@payment_workflow.handler()
async def status(ctx: WorkflowSharedContext):
await ctx.get("status")
# </mark_2>

app = restate.app([payment_workflow])
# <end_here>
2 changes: 1 addition & 1 deletion code_snippets/python/src/develop/my_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
async def my_service_handler(ctx: Context, greeting: str) -> str:
return f"${greeting}!"

app = restate.app(services=[my_service])
app = restate.app([my_service])
2 changes: 1 addition & 1 deletion code_snippets/python/src/develop/my_virtual_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ async def my_object_handler(ctx: ObjectContext, greeting: str) -> str:
async def my_concurrent_handler(ctx: ObjectSharedContext, greeting: str) -> str:
return f"${greeting} ${ctx.key()}!"

app = restate.app(services=[my_virtual_object])
app = restate.app([my_virtual_object])
5 changes: 1 addition & 4 deletions code_snippets/python/src/develop/my_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,4 @@ async def interact_with_workflow(ctx: WorkflowSharedContext, req: str):
# ... implement interaction logic here ...
return

app = restate.app(services=[my_workflow])


Serde
app = restate.app([my_workflow])
Loading

0 comments on commit bc73d3e

Please sign in to comment.