Skip to content

Commit

Permalink
adding documentation support
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Oct 17, 2022
1 parent 19973b5 commit 41f397a
Show file tree
Hide file tree
Showing 33 changed files with 1,351 additions and 987 deletions.
26 changes: 26 additions & 0 deletions .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[bumpversion]
current_version = 0.1.0
commit = True
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-(?P<pre_release>[a-z]+))?
serialize =
{major}.{minor}.{patch}-{pre_release}
{major}.{minor}.{patch}

[bumpversion:file:dagger/__init__.py]

[bumpversion:file:docs/index.md]
search = Dagger - {current_version}
replace = Dagger - {new_version}

[bumpversion:file:setup.cfg]
search = version = {current_version}
replace = version = {new_version}

[bumpversion:part:pre_release]
optional_value = final
values =
final
dev
alpha
beta
50 changes: 25 additions & 25 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,31 +138,31 @@ jobs:
# - name: Validate docs
# run: ./docker/validate_docs.sh
#
# build-docs:
# runs-on: ubuntu-latest
# if: github.event_name == 'pull_request'
# steps:
# - name: Check out code
# uses: actions/checkout@v3
#
# - uses: actions/[email protected]
# with:
# python-version: ${{ env.PYTHON_VERSION }}
#
# - name: Install dependencies
# uses: ./.github/actions/install-dependencies
# with:
# requirements: "true"
# docs-requirements: "true"
#
# - name: Build Docs
# run: mkdocs build --strict
#
# - name: Upload coverage results artifact
# uses: actions/upload-artifact@v3
# with:
# name: docs-site
# path: site/
build-docs:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- name: Check out code
uses: actions/checkout@v3

- uses: actions/[email protected]
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: Install dependencies
uses: ./.github/actions/install-dependencies
with:
requirements: "true"
docs-requirements: "true"

- name: Build Docs
run: mkdocs build --strict

- name: Upload coverage results artifact
uses: actions/upload-artifact@v3
with:
name: docs-site
path: site/
#
# update-dev-docs:
# runs-on: ubuntu-latest
Expand Down
195 changes: 12 additions & 183 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ from dagger.templates.template import *

logger = logging.getLogger(__name__)

workflow_engine = Dagger(broker="kafka://localhost:9092", datadir="/tmp/data/")
workflow_engine = Dagger(broker="kafka://localhost:9092", store="aerospike://", datadir="/tmp/data/")

@Dagger.register_template('OrderWorkflow')
def order_template(template_name: str) -> ITemplateDAG:
Expand Down Expand Up @@ -72,207 +72,36 @@ workflow_engine.main()

The ``register_template`` decorator defines a "DAG processor" that essentially defines the various processes and child
tasks the DAG executes. In the example above the code creates a named template ``OrderWorkflow`` and associates
a ``PAYMENT`` process with 2 child tasks ``PAYMENT_LISTENER`` and ``PAYMENT_COMMAND``. The ``SHIPPING`` process follows
with similarly named topics and processes and the template defines the root process and links them in a DAG (Directed
Acyclic Graph) structure
a ``PAYMENT`` process with 2 child tasks ``PAYMENT_LISTENER`` using ``PaymentKafkaListenerTask`` and ``PAYMENT_COMMAND``
using the ``PaymentKafkaCommandTask`` definition. The ``SHIPPING`` process follows
after the ``PAYMENT`` process with similarly named topics and processes and the template defines the root process and
links them in a DAG (Directed Acyclic Graph) structure

The application can define as many DAG'S it needs to model using the ``register_template``
decorator. process-engine populates all the DAG templates in the codebase decorated with `register_template`
decorator. dagger populates all the DAG templates in the codebase decorated with `register_template`

Here's and example of how to create an instance of a specific DAG:

```python
template = workflow_engine.get_template('BulkTemplate')
template = workflow_engine.get_template('OrderWorkflow')
runtime_parameters:Dict[str, str] = dict()
runtime_parameters['customer_name']= `EXAMPLE_CUSTOMER`
runtime_parameters['order_number'] = 'EXAMPLE_ORDER'
template_instance = await template.create_instance(uuid1(), runtime_parameters)
workflow_instance = await template.create_instance(uuid1(), runtime_parameters)
```

To begin execution of the DAG instance created above

```python
await workflow_engine.submit(template_instance)
await workflow_engine.submit(workflow_instance)
```

This begins the actual execution of the tasks created by the template definition and executes them in the sequence as
defined in the template. The engine currently supports the following types of tasks:

## KafkaCommandTask

This task is used to send a request/message on a Kafka Topic defined using the template builder. This type of task is a
child task in the execution graph and can be extended by implementing the method

```python
@abc.abstractmethod
async def execute(self) -> None:
...
```

## KafkaListenerTask

This task waits/halts the execution of the DAG until a message is received on the defined Kafka topic(in the template
definition). Each task is created using the DAG builder defines a durable key to correlate each received message on the
topic against listener tasks. The Engine handles the complexity of invoking the appropriate task instance based on the
key in the payload.

A listener task needs to implement the following methods

```python

@abc.abstractmethod
async def on_message(self, *args: Any, **kwargs: Any) -> None :
...

@abc.abstractmethod
async def get_correlatable_key(self, payload: Any) -> TaskLookupKey:
...
```

The get_correlatable_key method extracts the key by parsing the payload received on the Kafka topic. Using this key the
DAGGER looks up the appropriate task from the list of tasks waiting on this event and invokes `on_message` on each one
of them. The default implementation of this task just sets this task and `COMPLETED`

The engine provides the flexibility to implement any other type of listener task by implementing the following interface

```python
class SensorTask(ITask[KT, VT]):
```

along with a custom `TaskTemplateBuilder`

```python
class TaskTemplateBuilder:
app: Service

def __init__(self, app: Service) -> None :
self.app = app

@abc.abstractmethod
def set_type(self, task_type:Type[ITask]) -> TaskTemplateBuilder:
...

@abc.abstractmethod
def build(self) -> TaskTemplate:
...

```

## TriggerTask

This task waits/halts the execution of the DAG until current time >= the trigger time on the task

A trigger task needs to implement the following method

```python
@abc.abstractmethod
async def execute(self) -> None:
...
```

The engine provides a `TriggerTaskTemplateBuilder` helper to model the task in the DAG.
The `set_time_to_execute_lookup_key` on this builder is used to define the key to lookup the trigger time provided in
the runtime parameters of the task

## DecisionTask

This type of task is similar to the `case..switch` statement in a programming language. It returns the next task to
execute based on the execution logic. A decision task needs to implement

```python
@abc.abstractmethod
async def evaluate(self, **kwargs: Any) -> Optional[UUID]:
...
```

This method returns the UUID of the next task to execute in the execution path

The Engine provides a `DecisionTaskTemplateBuilder` to model a decision task in the DAG

## RESTful API

The framework provides a RESTFul API to retrieve the status of root task instances. Root task is the instance created
using the `TaskTemplate`
which then has multiple, chained ProcessTasks and child tasks(KafkaCommand and KafkaListener tasks)

```json
http://<hostname>:6066/tasks/instances

[
{
"child_dags": [],
"child_tasks": [
{
"child_dags": [
"89bbf26c-0727-11ea-96e5-0242ac150004",
"89bc1486-0727-11ea-96e5-0242ac150004"
],
"correlatable_key": null,
"id": "89bbedd0-0727-11ea-96e5-0242ac150004",
"lastupdated": 1573767727,
"parent_id": "89bbe43e-0727-11ea-96e5-0242ac150004",
"process_name": "PAYMENT",
"runtime_parameters": {
"order_number": "ID000",
"customer": "ID000"
},
"status": {
"code": "COMPLETED",
"value": "Complete"
},
"task_type": "NON_ROOT",
"time_completed": 1573767727,
"time_created": 1573767624,
"time_submitted": 1573767698
},
{
"child_dags": [
"89bc3984-0727-11ea-96e5-0242ac150004",
"89bc482a-0727-11ea-96e5-0242ac150004"
],
"correlatable_key": null,
"id": "89bc35f6-0727-11ea-96e5-0242ac150004",
"lastupdated": 1573767727,
"parent_id": "89bbe43e-0727-11ea-96e5-0242ac150004",
"process_name": "SHIPPING",
"runtime_parameters": {
"order_number": "ID000",
"customer": "ID000"
},
"status": {
"code": "EXECUTING",
"value": "Executing"
},
"task_type": "NON_ROOT",
"time_completed": 0,
"time_created": 1573767624,
"time_submitted": 1573767727
}
],
"correlatable_key": null,
"id": "89bbe43e-0727-11ea-96e5-0242ac150004",
"lastupdated": 1573767624,
"parent_id": null,
"runtime_parameters": {
"order_number": "ID000",
"customer": "ID000"
},
"status": {
"code": "EXECUTING",
"value": "Executing"
},
"task_type": "ROOT",
"time_completed": 0,
"time_created": 1573767624,
"time_submitted": 1573767698
}]

```

defined in the template.
Dagger supports any type of stream data: bytes, Unicode and serialized structures, but also comes with "Models" that use
modern Python syntax to describe how keys and values in streams are serialized. For more details on supported models
refer to
<https://faust.readthedocs.io/en/latest/userguide/models.html>
<https://faust-streaming.github.io/faust/userguide/models.html>

## OpenTelemetry

Expand Down Expand Up @@ -313,7 +142,7 @@ To install using `pip`:
pip install py-dagger
```

dagger has a dependency on `faust` for kafka stream processing
dagger has a dependency on `faust-streaming` for kafka stream processing

## FAQ

Expand Down
Loading

0 comments on commit 41f397a

Please sign in to comment.