Skip to content

Commit

Permalink
Implementing stop feature on workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Mar 28, 2023
1 parent bc0d486 commit 058a4df
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 9 deletions.
2 changes: 1 addition & 1 deletion dagger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "0.1.4"
__version__ = "0.1.5"
__author__ = "Vikram Patki <[email protected]>"
__all__ = [] # type: ignore
4 changes: 2 additions & 2 deletions dagger/modeler/builder_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def generic_command_task_builder(
value_serializer=value_serializer,
)
command_task_builder = KafkaCommandTaskTemplateBuilder(self.app)
command_task_builder.set_topic(command_topic)
command_task_builder.set_topic(command_topic) # type: ignore
command_task_builder.set_type(task_type)
command_task_builder.set_name(f"{process_name}_command_task")
return command_task_builder
Expand Down Expand Up @@ -178,7 +178,7 @@ def generic_listener_task_builder(
value_serializer=value_serializer,
)
listener_task_builder = KafkaListenerTaskTemplateBuilder(self.app)
listener_task_builder.set_topic(listener_topic)
listener_task_builder.set_topic(listener_topic) # type: ignore
listener_task_builder.set_concurrency(concurrency=concurrency)
listener_task_builder.set_type(task_type)
listener_task_builder.set_name(f"{process_name}_listener_task")
Expand Down
23 changes: 22 additions & 1 deletion dagger/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,11 @@ class INonLeafNodeTask(ITask[KT, VT], abc.ABC):
task_type: str = TaskType.SUB_DAG.name

async def stop(self) -> None:
pass
self.status = TaskStatus(
code=TaskStatusEnum.STOPPED.name,
value=TaskStatusEnum.STOPPED.value,
)
await super().stop()

async def start(self, workflow_instance: Optional[ITemplateDAGInstance]) -> None:
if self.status.code in [
Expand Down Expand Up @@ -1311,6 +1315,23 @@ class DefaultTemplateDAGInstance(ITemplateDAGInstance[str, str]):
Default Implementation of ITemplateDAGInstance
"""

async def stop(self) -> None:
remaining_tasks: Optional[List[ITask]] = await self.get_remaining_tasks(
next_dag_id=self.root_dag, workflow_instance=self, tasks=[] # type: ignore
) # type: ignore
if remaining_tasks:
for task in remaining_tasks:
if task.status.code == TaskStatusEnum.EXECUTING.name:
await task.stop()
if task.status.code not in TERMINAL_STATUSES:
task.status = TaskStatus(
code=TaskStatusEnum.STOPPED.name,
value=TaskStatusEnum.STOPPED.value,
)

await super().stop()
await dagger.service.services.Dagger.app._update_instance(task=self) # type: ignore

def get_correlatable_key(self, payload: Any) -> TaskLookupKey:
"""Not implemented.
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ services:
- dagger_py

zookeeper:
image: "confluentinc/cp-zookeeper"
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper
environment:
- ZOOKEEPER_CLIENT_PORT=2181
Expand All @@ -45,7 +45,7 @@ services:

kafka:
# pinned due to https://github.com/confluentinc/kafka-images/issues/127
image: confluentinc/cp-kafka:7.0.0
image: confluentinc/cp-kafka:7.3.2
hostname: kafka
container_name: kafka
ports:
Expand Down
18 changes: 18 additions & 0 deletions docs/usage-guide/fundamentals.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,24 @@ which then has multiple, chained ProcessTasks and child tasks(KafkaCommand and K

```

## How to stop a workflow

### Step 1. Implement the stop method in the task modeled in the workflow

```python
async def stop(self) -> None:
print("Stop called")
```
### Step 2. Invoke stop on the workflow instance

```python
workflow_instance: ITemplateDAGInstance = await dagger_instance.get_instance(id=workflow_id)
await workflow_instance.stop()
```

This will invoke `stop` on tasks in `EXECUTING` state


## Detailed Sections

* [getting-started][getting-started]
Expand Down
15 changes: 14 additions & 1 deletion integration_tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@
topic_list.append(
NewTopic(name="simple_topic", num_partitions=1, replication_factor=1)
)
topic_list.append(
NewTopic(name="simple_topic_stop", num_partitions=1, replication_factor=1)
)
topic_list.append(
NewTopic(name="SIMPLE_LISTENER", num_partitions=1, replication_factor=1)
)
Expand Down Expand Up @@ -139,6 +142,8 @@
workflow_engine.tables_cleared = False
orders_topic = workflow_engine.faust_app.topic(ORDERS_TOPIC, value_type=str)
simple_topic = workflow_engine.faust_app.topic("simple_topic", value_type=str)
simple_topic_stop = workflow_engine.faust_app.topic("simple_topic_stop", value_type=str)

simple_listener = workflow_engine.faust_app.topic("SIMPLE_LISTENER", value_type=str)

templates: List[ITemplateDAGInstance] = list()
Expand Down Expand Up @@ -354,7 +359,7 @@ class SimpleKafkaListenerTask(KafkaListenerTask[str, str]):
correlatable_key = "simple_id"

async def stop(self) -> None:
pass
print("Stop called")

def get_status(self) -> TaskStatus:
return self.status
Expand Down Expand Up @@ -705,6 +710,14 @@ async def create_and_submit_pizza_delivery_workflow(
await workflow_engine.submit(pizza_workflow_instance, repartition=False)


@workflow_engine.faust_app.agent(simple_topic_stop)
async def simple_data_stream_stop(stream):
async for value in stream:

instance = await workflow_engine.get_instance(running_task_ids[-1])
await instance.stop()


@workflow_engine.faust_app.agent(simple_topic)
async def simple_data_stream(stream):
async for value in stream:
Expand Down
49 changes: 48 additions & 1 deletion integration_tests/test_dagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def workflow_consumer():
"task_update_topic",
bootstrap_servers=KAFKA_ADMIN_CLIENT_URL,
group_id=f"workflow_{uuid.uuid1()}",
consumer_timeout_ms=2000,
consumer_timeout_ms=20000,
auto_offset_reset="earliest",
)
return workflow_consumer
Expand Down Expand Up @@ -266,3 +266,50 @@ def test_workflow_update(workflow_consumer):
assert executing_count > 1
assert completed_count >= 1
assert workflow_count > 1


SIMPLE_ID_TO_DELETE = "SIMPLE_ID_3"


def test_create_new_simple():
producer = KafkaProducer(bootstrap_servers=[KAFKA_ADMIN_CLIENT_URL])
future = producer.send(
topic="simple_topic", value=json.dumps(SIMPLE_ID_TO_DELETE).encode("utf-8")
)
# Block for 'synchronous' sends
future.get(timeout=10)
producer.flush()


@retry(AssertionError, tries=30, delay=1)
def test_new_simple_workflow_instances_created():
response = requests.get(DAGGER_URL + "/tasks/instances")
assert response.status_code == requests.codes.ok
engine_response = json.loads(response.content)
executing_counter = 0
for instance in engine_response:
if instance["status"]["code"] == "EXECUTING":
executing_counter += 1
assert executing_counter == 1


def test_stop_new_simple():
producer = KafkaProducer(bootstrap_servers=[KAFKA_ADMIN_CLIENT_URL])
future = producer.send(
topic="simple_topic_stop", value=json.dumps(SIMPLE_ID_TO_DELETE).encode("utf-8")
)
# Block for 'synchronous' sends
future.get(timeout=10)
producer.flush()


@retry(AssertionError, tries=30, delay=1)
def test_new_simple_workflow_instances_stopped():
response = requests.get(DAGGER_URL + "/tasks/instances")
assert response.status_code == requests.codes.ok
engine_response = json.loads(response.content)
executing_counter = 0
for instance in engine_response:
if instance["status"]["code"] == "STOPPED":
executing_counter += 1
assert executing_counter == 1
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ pytest-cov==4.0.0
pytest-mock==3.10.0
pytest-asyncio==0.14.0
pytest-runner==4.2
confluent_kafka==1.9.2
kafka-python>=2.0.2
72 changes: 72 additions & 0 deletions tests/tasks/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,78 @@ async def test_parallel_composite_task_notify_atleast_one_one_complete(
)
assert parallel_composite_task_fixture.on_complete.called

@pytest.mark.asyncio
async def test_stop_workflow(
self, template_fixture, executor_fixture, sensor_fixture, decision_fixture
):

first_process = DefaultProcessTemplateDAGInstance(uuid1())
second_process = DefaultProcessTemplateDAGInstance(uuid1())
template_fixture.root_dag = first_process.id
template_fixture.add_task(first_process)
first_process.root_dag = executor_fixture.id
template_fixture.add_task(executor_fixture)
second_process.root_dag = decision_fixture.id
template_fixture.add_task(decision_fixture)
template_fixture.add_task(sensor_fixture)
template_fixture.add_task(second_process)
first_process.next_dags = [second_process.id]
second_process.next_dags = []
first_process.stop = CoroutineMock()
second_process.stop = CoroutineMock()
executor_fixture.stop = CoroutineMock()
sensor_fixture.stop = CoroutineMock()
decision_fixture.stop = CoroutineMock()
template_fixture.status = TaskStatus(
code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value
)
first_process.status = TaskStatus(
code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value
)
sensor_fixture.status = TaskStatus(
code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value
)

second_process.status = TaskStatus(
code=TaskStatusEnum.NOT_STARTED.name, value=TaskStatusEnum.NOT_STARTED.value
)
executor_fixture.status = TaskStatus(
code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value
)
decision_fixture.status = TaskStatus(
code=TaskStatusEnum.NOT_STARTED.name, value=TaskStatusEnum.NOT_STARTED.value
)

executor_fixture.next_dags = [sensor_fixture.id]
executor_fixture.root_dag = None
sensor_fixture.next_dags = []
sensor_fixture.root_dag = None
decision_fixture.next_dags = []
decision_fixture.root_dag = None
dagger.service.services.Dagger.app._update_instance = CoroutineMock()
await template_fixture.stop()
assert template_fixture.status == TaskStatus(
code=TaskStatusEnum.STOPPED.name, value=TaskStatusEnum.STOPPED.value
)
assert first_process.status == TaskStatus(
code=TaskStatusEnum.STOPPED.name, value=TaskStatusEnum.STOPPED.value
)
assert second_process.status == TaskStatus(
code=TaskStatusEnum.STOPPED.name, value=TaskStatusEnum.STOPPED.value
)
assert executor_fixture.status == TaskStatus(
code=TaskStatusEnum.COMPLETED.name, value=TaskStatusEnum.COMPLETED.value
)
assert decision_fixture.status == TaskStatus(
code=TaskStatusEnum.STOPPED.name, value=TaskStatusEnum.STOPPED.value
)
assert first_process.stop.called
assert not second_process.stop.called
assert not executor_fixture.stop.called
assert not decision_fixture.stop.called
assert sensor_fixture.stop.called
assert dagger.service.services.Dagger.app._update_instance.called

@pytest.mark.asyncio
async def test_get_remaining_tasks(
self, template_fixture, executor_fixture, sensor_fixture, decision_fixture
Expand Down

0 comments on commit 058a4df

Please sign in to comment.