Skip to content

Commit

Permalink
Adding helper methods to build dags
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Oct 25, 2022
1 parent 649234f commit d54d99a
Show file tree
Hide file tree
Showing 5 changed files with 518 additions and 157 deletions.
3 changes: 2 additions & 1 deletion .markdownlint.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
"front_matter_title": ""
},
"MD041": false,
"MD022": false
"MD022": false,
"MD009": false
}
281 changes: 201 additions & 80 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,100 +11,221 @@
**Dagger** is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous and
synchronous long-running business logic in a scalable and resilient way.
Dagger requires Python 3.7 or later for the new `async/await`_ syntax, and variable type annotations.
Here's an example of how to use the library to build and run a workflow:

## Pizza Ordering and Delivery Workflow Example
Here's an example of how to use the library to build and run a Pizza Ordering Workflow:

![Pizza Workflow](docs/images/workflow.jpg)

The PizzaWorkflow consists of 2 Processes:

* Order : Responsible for communicating with the order service to place a pizza order(CommandTask) and wait for the order
to be ready(ListenerTask)
* Delivery: Once the order is ready, this process communicates with the delivery service to start delivery of the pizza
order(CommandTask)

### Step 1 Instantiate Dagger

```python
import logging
from uuid import uuid1
from dagger.service.services import Dagger
from dagger.modeler.definition import *
from dagger.templates.template import *

logger = logging.getLogger(__name__)

workflow_engine = Dagger(broker="kafka://localhost:9092", store="aerospike://", datadir="/tmp/data/")

@Dagger.register_template('OrderWorkflow')
def order_template(template_name: str) -> ITemplateDAG:


# Create empty default Template
template_builder = DefaultTemplateBuilder(Dagger.app)
template_builder.set_name(template_name)
template_builder.set_type(DefaultTemplateDAGInstance)

# First process in template (first node in DAG) named waving
payment_process_builder = ProcessTemplateDagBuilder(Dagger.app)
payment_process_builder.set_name("PAYMENT")

# First task and topic in payment process
payment_command_topic = Dagger.create_topic("PAYMENT_COMMAND", key_type=bytes, value_type=bytes)
payment_command_task_template_builder = KafkaCommandTaskTemplateBuilder(Dagger.app)
payment_command_task_template_builder.set_topic(payment_command_topic)
payment_command_task_template_builder.set_type(PaymentKafkaCommandTask)

# Second task and topic in payment process
payment_topic: Topic = Dagger.create_topic("PAYMENT_LISTENER", key_type=bytes, value_type=bytes)
payment_listener_task_template_builder = KafkaListenerTaskTemplateBuilder(Dagger.app)
payment_listener_task_template_builder.set_topic(payment_topic)
payment_listener_task_template_builder.set_type(PaymentKafkaListenerTask)

# Link and build tasks in payment process (define root task and order, essentially just created a child DAG inside the parent DAG)
payment_listener_task_template = payment_listener_task_template_builder.build()
payment_command_task_template_builder.set_next(payment_listener_task_template)
payment_command_task_template = payment_command_task_template_builder.build()
payment_process_builder.set_root_task(payment_command_task_template)
payment_process_builder.set_type(DefaultProcessTemplateDAGInstance)

# Build more processes like above and link them

[...]

# Link and build processes in DAG (define root task and order) Assuming one more process called "SHIPPING" was created, this would be the flow:
shipping_template = shipping_process_builder.build()
payment_process_builder.set_next_process(shipping_template)
payment_template = payment_process_builder.build()
template_builder.set_root(payment_template)

btemplate = template_builder.build()
return btemplate

# Starts the worker
workflow_engine.main()
workflow_engine = Dagger(
broker=KAFKA_ADMIN_CLIENT_URL,
store="aerospike://",
consumer_auto_offset_reset="latest",
task_update_topic="task_update_topic",
trigger_interval=600,
aerospike_config=aerospike_config,
enable_changelog=False,
web_port=6066,
serializer="raw",
)
```

### Step 2 Define Leaf Command and Listener Tasks For Order Process

Let's assume that the Order Microservice processes incoming orders over a Kafka Topic - `pizza_order_topic` with the
JSON schema

```json
{
"order_id": "id",
"pizza_type": "1",
"customer_id": "customer_id"
}
```

Using this information let's build out the OrderCommandTask by overriding the `execute` method that implements the
business logic on how to send the payload to the Order Service over a Kafka topic

```python
class OrderCommandTask(KafkaCommandTask[str, str]):
async def execute(
self,
runtime_parameters: Dict[str, str],
workflow_instance: ITemplateDAGInstance,
) -> None:
payload = {
"order_id": runtime_parameters["order_id"],
"customer_id": runtime_parameters["customer_id"],
"pizza_type": runtime_parameters["pizza_type"],
}
await workflow_engine.topics[self.topic].send(
value=json.dumps(payload)
)
```

After executing the `OrderCommandTask`, the workflow should enter a `WAIT_STATE` until it receives a message from the
OrderService about the status of the order. Let's assume that Order Service sends a message on a Kafka Topic:
order_status_topic when the order is ready in the following JSON format

```json
{
"order_id": "id",
"status": "READY"
}
```

Let's model the `OrderListenerTask` to process this message on the `order_status_topic` by implementing the
`get_correlatable_keys_from_payload` and `on_message` methods on the Listener. It also needs to specify the `correletable_key`
as `order_id` to look up the payload

```python
class PizzaWaitForReadyListener(KafkaListenerTask[str, str]):
correlatable_key = "order_id"

async def get_correlatable_keys_from_payload(
self, payload: Any
) -> List[TaskLookupKey]:
tpayload = json.loads(payload)
key = tpayload[self.correlatable_key]
return [(self.correlatable_key, key)]

async def on_message(
self, runtime_parameters: Dict[str, VT], *args: Any, **kwargs: Any
) -> bool :
logger.info(f"Pizza Order is Ready")
return True
```

The ``register_template`` decorator defines a "DAG processor" that essentially defines the various processes and child
tasks the DAG executes. In the example above the code creates a named template ``OrderWorkflow`` and associates
a ``PAYMENT`` process with 2 child tasks ``PAYMENT_LISTENER`` using ``PaymentKafkaListenerTask`` and ``PAYMENT_COMMAND``
using the ``PaymentKafkaCommandTask`` definition. The ``SHIPPING`` process follows after the ``PAYMENT`` process with
similarly named topics and processes and the template defines the root process and links them in a
DAG (Directed Acyclic Graph) structure
When the order service sends a status message on the `order_status_topic`, Dagger invokes the `get_correlatable_keys_from_payload`
to determine which workflow instance that message belongs to. Once it determines the workflow instance, it invokes
`on_message` on the corresponding ListenerTask

The application can define as many DAGs it needs to model using the ``register_template``
decorator. dagger populates all the DAG templates in the codebase decorated with `register_template`
Now that we have the LEAF tasks modeled, lets attach them to the parent `Order` Process

Here's an example of how to create an instance of a specific DAG:
```python
def pizza_ordering_process(
process_name: str = "Order"
) -> IProcessTemplateDAGBuilder:
dag_builder = DAGBuilderHelper(dagger_app=workflow_engine)
root_task = dag_builder.build_and_link_tasks(
[
dag_builder.generic_command_task_builder(
topic="pizza_order_topic",
task_type=OrderCommandTask,
process_name=process_name,
),
dag_builder.generic_listener_task_builder(
topic="PizzaWaitForReadyListener",
task_type=PizzaWaitForReadyListener,
process_name=process_name,
),
]
)
return dag_builder.generic_process_builder(process_name=process_name, root_task=root_task)
```

The Order Process is in `COMPLETED` when both the CommandTask and the PizzaWaitForReadyListener are `COMPLETED` and then
the workflow transitions to execute the next Process `Delivery`

### Step 3 Define Leaf Command Tasks For Delivery Process

Let's assume that the delivery service just requires an HTTP POST request with the following schema

```json
{
"order_id": "id",
"customer_id": "customer_id"
}
```

We can model the DeliveryCommandTask to POST this payload by implementing the `execute` method as follows

```python
template = workflow_engine.get_template('OrderWorkflow')
runtime_parameters: Dict[str, str] = dict()
runtime_parameters['customer_name']= 'EXAMPLE_CUSTOMER'
runtime_parameters['order_number'] = 'EXAMPLE_ORDER'
workflow_instance = await template.create_instance(uuid1(), **runtime_parameters)
class DeliveryCommandTask(ExecutorTask[str, str]):
async def execute(
self, runtime_parameters: Dict[str, VT], workflow_instance: ITask = None
) -> None:
payload = {
"order_id": runtime_parameters["order_id"],
"customer_id": runtime_parameters["customer_id"],
}
async with aiohttp.ClientSession() as session:
async with session.post(url="http://www.deliverysvc.com", json=payload):
pass
```

To begin execution of the DAG instance created above:
Let's attach this to the parent `Delivery` Process

```python
await workflow_engine.submit(workflow_instance)
def pizza_delivery_process(
process_name: str = "Delivery",
) -> IProcessTemplateDAGBuilder:
dag_builder = DAGBuilderHelper(dagger_app=workflow_engine)
root_task = dag_builder.build_and_link_tasks(
[
dag_builder.generic_executor_task_builder(
task_type=DeliveryCommandTask,
name=process_name,
)
]
)
return dag_builder.generic_process_builder(
process_name=process_name, root_task=root_task
)
```

This begins the actual execution of the tasks created by the template definition and executes them in the sequence as
defined in the template.
Dagger supports any type of stream data: bytes, Unicode and serialized structures, but also comes with "Models" that use
modern Python syntax to describe how keys and values in streams are serialized. For more details on supported models
refer to
<https://faust-streaming.github.io/faust/userguide/models.html>
### Step 4 Define the Sequence of Process Execution and register the workflow definition using `register_template`

Based on the workflow, we want the `Order` Process to execute first before the `Delivery` Process. The workflow ensures
that the `Delivery` tasks are executed only after both the tasks in the `Order` process are in a terminal state

```python
@Dagger.register_template("PizzaWorkflow")
def register_pizza_workflow(template_name: str) -> ITemplateDAG:
dag_builder_helper = DAGBuilderHelper(workflow_engine)
order_process = dag_builder_helper.build_and_link_processes(
[
pizza_ordering_process(process_name="Order"),
pizza_delivery_process(process_name="Delivery"),
]
)
return dag_builder_helper.generic_template(
template_name=template_name, root_process=order_process
)
```

### Step 5 Define an API to instantiate and execute pizza order workflows

```python
async def create_and_submit_pizza_delivery_workflow(
order_id: str, customer_id: str, pizza_type: int
):
pizza_workflow_template = workflow_engine.template_dags["PizzaWorkflow"]
pizza_workflow_instance = await pizza_workflow_template.create_instance(
uuid.uuid1(),
repartition=False, # Create this instance on the current worker
order_id=order_id,
customer_id=customer_id,
pizza_type=pizza_type,
)
await workflow_engine.submit(pizza_workflow_instance, repartition=False)
```

### Step 6 Start the worker

```python
workflow_engine.main()
```

## Dagger is

Expand Down
Binary file added docs/images/workflow.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit d54d99a

Please sign in to comment.