Skip to content

Commit

Permalink
feature: get started with basic worker
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Dec 18, 2024
1 parent f68ccde commit b8b089f
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 15 deletions.
1 change: 1 addition & 0 deletions docs/get-started/build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Build your worker image (WIP)
41 changes: 41 additions & 0 deletions docs/get-started/implement/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# How to use the worker template ?

The [datashare-python](https://github.com/ICIJ/datashare-python) repository is meant to be used as a template to implement your own Datashare worker.

## Clone the template repository

Start by cloning the [template repository](https://github.com/ICIJ/datashare-python):

<!-- termynal -->
```console
$ git clone [email protected]:ICIJ/datashare-python.git
---> 100%
```

## Explore the codebase

In addition to be used as a template, the repository can also showcases some of advanced schemes detailed in the
[guides](../../guides/index.md) section of this documentation.

Don't hesitate to have a look at the codebase before starting (or get back to it later on) !

In particular the following files should be of interest:
```console
.
├── ml_worker
│ ├── app.py
│ ├── config.py
│ ├── tasks
│ │ ├── __init__.py
│ │ ├── classify_docs.py
│ │ ├── dependencies.py
│ │ └── translate_docs.py
```


## Replace existing tasks with your own

To implement your Datashare worker the only thing you have to do is to **replace existing tasks with your own and
register them in the `app` app variable of the `app.py` file.**

We'll detail how to do so in the [Basic Worker](./worker-basic.md) and [Advanced Worker](./worker-advanced.md) examples.
58 changes: 58 additions & 0 deletions docs/get-started/implement/worker-advanced.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Implement your own Datashare worker

## Clone the template repository

Start by cloning the [template repository](https://github.com/ICIJ/datashare-python):

<!-- termynal -->
```console
$ git clone [email protected]:ICIJ/datashare-python.git
---> 100%
```

## Install dependencies

Install [`uv`](https://docs.astral.sh/uv/getting-started/installation/) and install dependencies:
<!-- termynal -->
```console
$ curl -LsSf https://astral.sh/uv/install.sh | sh
$ uv sync --frozen --group dev
```

## Implement your own tasks

The template repository already contains some examples of tasks performing document translation and classification.
You can keep them for now as a model to implement your own task, but we'll eventually want to **get rid of them (and their tests)**.

For the sake of example, let's add a dummy [TF-IDF-based](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) vector store to
Datashare and add different tasks to our [async app](../../learn/concepts-basic.md#app):

- the `create_vectorization_tasks` task scans the Datashare index, fits a [`TfidfVectorizer`](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html) and persists it to the filesystem.
It will then create `vectorize_docs` task grouping vectorization task
- the `vectorize_docs` task receives a batch of doc IDs, loads the vectorizer, vectorizes the docs and persist the vectors to the filesystem
- the `find_most_similar` task receives a batch of doc IDs and find their nearest neighbors in the vector database

!!! tip
The `create_vectorization_tasks` task may seem artificial, however in case the vectorization work load is high, it's useful to distribute it across works.
Having a first task splitting a large task into smaller ones allows us to distribute the computationally expensive task across several workers.
In our case, it probably adds complexity but it's here for demo purpose.

Learn more about how to implement complex workflows and workload distribution in the [task workflow guide](../../guides/task-workflows.md) !


### Updating dependencies

Let's add the [scikit-learn](https://scikit-learn.org/stable/index.html) and [pandas](https://pandas.pydata.org/docs/index.html) as dependencies to our project:
<!-- termynal -->
```console
$ uv add scikit-learn pandas
```

### Update dependency injection
Minimal: config + loggers

If your tasks require to dependencies to be injected (configu, think about DB clients)

## Test

## Register your tasks in the `app`
53 changes: 53 additions & 0 deletions docs/get-started/implement/worker-basic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Basic Datashare worker

## Install dependencies

Start by installing [`uv`](https://docs.astral.sh/uv/getting-started/installation/) and dependencies:
<!-- termynal -->
```console
$ curl -LsSf https://astral.sh/uv/install.sh | sh
$ uv sync --frozen --group dev
```

## Implement the `hello_user` task function

As seen in the [task tutorial](../../learn/tasks.md#task-arguments), one of the dummiest tasks we can implement take
the `:::python user: dict | None` argument automatically added by Datashare to all tasks and greet that user.

The function performing this task is the following

```python
--8<--
basic_app.py:hello_user_fn
--8<--
```

## Register the `hello_user` task

In order to turn our function into a Datashare [task](../../learn/concepts-basic.md#tasks), we have to register it into the
`:::python app` [async app](../../learn/concepts-basic.md#app) variable of the [app.py](../../../ml_worker/app.py) file, using the `:::python @task` decorator.

Since we won't use existing tasks, we can also perform some cleaning and get rid of them.
The `app.py` file should hence look like this:

```python title="app.py" hl_lines="9"
--8<--
basic_app.py:app
--8<--
```

The only thing we had to do is to use the `:::python @app.task` decorator and make sure to provide it with
`:::python name` to **bind the function to a task name** and group the task in the `:::python PYTHON_TASK_GROUP = TaskGroup(name="PYTHON")`.

As detailed in [here](../../learn/datashare-app.md#grouping-our-tasks-in-the-python-task-group), using this task group
ensures that when custom tasks are published for execution, they are correctly routed to your custom Python worker and
not to the Java built-in workers running behind Datashare's backend.

## Get rid of unused codebase

## Next

Now that you have created a basic app, you can either:

- learn how to [build a docker image](../build.md) from it
- learn how to implement a more realistic worker in the [advanced example](./worker-advanced.md)
6 changes: 5 additions & 1 deletion docs/get-started/index.md
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# Get started (WIP)
# About

This section will is a step-by-step guide to create and deploy your own Datashare tasks.

You might want to [learn](../learn/index.md) the basics before actually starting to implement your own worker.
3 changes: 3 additions & 0 deletions docs/get-started/run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Run using `docker compose` (WIP)

## Publish tasks
17 changes: 13 additions & 4 deletions docs/learn/datashare-app.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,25 @@ since they can't execute these tasks, they will fail.

In order, to assign a task to a specific group, you can use the `:::python group: str | TaskGroup | None ` argument of the `:::python task` decorator.

Our app can hence be updated as following:

```python title="my_app.py" hl_lines="7 10 15 23"
Read the full guide to [task routing](../guides/task-routing.md) to learn more.

## Adding the mandatory `:::python user: dict | None` argument to all tasks

Datashare systematically adds a `:::python user: dict | None` to task arguments.
Since task arguments are directly forwarded your function, they need to support the `user` argument even when used,
otherwise the task will fail complaining that the `user` argument was provided as input but is not used by the task.

## Our first Datashare app

The app can hence be updated as following:

```python title="my_app.py" hl_lines="7 10 11 15 23"
--8<--
hello_world_app_ds.py:app
--8<--
```

Read the full guide to [task routing](../guides/task-routing.md) to learn more.

## Running group-specific workers

As detailed in the [task routing](../guides/task-routing.md) guide, worker pools can be restricted to execute tasks of a given group.
Expand Down
4 changes: 2 additions & 2 deletions docs/learn/index.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# About

This section is a tutorial and will guide you through the basic steps of building your own tasks for Datashare **from scratch**.
This section is a tutorial and will guide you through the basic steps theoretically required for building your own tasks for Datashare **from scratch**.

Rest assured, **in practice, you won't have to start anything from scratch**. The [get started](../get-started/index.md) section will show you how to create Datashare tasks by cloning the [datashare-python](https://github.com/ICIJ/datashare-python) template repo.
Don't worry, **in practice, you won't have to start anything from scratch**. The [get started](../get-started/index.md) section will show you how to create Datashare tasks by cloning the [datashare-python](https://github.com/ICIJ/datashare-python) template repo.

However, this section will teach you the basic steps it took to build this template.
23 changes: 23 additions & 0 deletions docs/src/basic_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# --8<-- [start:app]
from icij_worker import AsyncApp
from icij_worker.app import TaskGroup

app = AsyncApp("some-app")

PYTHON_TASK_GROUP = TaskGroup(name="PYTHON")


# --8<-- [end:hello_world]
@app.task(name="hello_user", group=PYTHON_TASK_GROUP)
# --8<-- [start:hello_user_fn]
def hello_user(user: dict | None) -> str:
greeting = "Hello "
if user is None:
user = "unknown"
else:
user = user["id"]
return greeting + user


# --8<-- [end:hello_user_fn]
# --8<-- [end:app]
10 changes: 7 additions & 3 deletions docs/src/hello_world_app_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,32 @@

# --8<-- [end:create_app]
@app.task(name="hello_world", group=PYTHON_TASK_GROUP)
def hello_world() -> str:
def hello_world(user: dict | None) -> str: # pylint: disable=unused-argument
return "Hello world"


# --8<-- [end:hello_world]
# --8<-- [start:hello_user]
@app.task(name="hello_user", group=PYTHON_TASK_GROUP)
def hello_user(user: str | None) -> str:
def hello_user(user: dict | None) -> str:
greeting = "Hello "
if user is None:
user = "unknown"
else:
user = user["id"]
return greeting + user


# --8<-- [end:hello_user]
# --8<-- [start:hello_user_progress]
@app.task(name="hello_user_progress", group=PYTHON_TASK_GROUP)
async def hello_user_progress(user: str | None, progress: RateProgress) -> str:
async def hello_user_progress(user: dict | None, progress: RateProgress) -> str:
greeting = "Hello "
await progress(0.5)
if user is None:
user = "unknown"
else:
user = user["id"]
res = greeting + user
await progress(1)
return res
Expand Down
10 changes: 9 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ repo_url: https://github.com/ICIJ/datashare-python
theme:
name: material
icon:
annotation: material/arrow-right-circle
repo: fontawesome/brands/github
favicon: assets/favicon.png
logo: assets/datashare-logo-color-square.svg
Expand Down Expand Up @@ -83,7 +84,14 @@ nav:
- Building an app: learn/app.md
- Running a worker pool with the icij-worker CLI: learn/run-worker.md
- Create tasks for Datashare: learn/datashare-app.md
- Get started: get-started/index.md
- Get started:
- get-started/index.md
- Implement your own Datashare worker:
- How to use the worker template: get-started/implement/index.md
- Basic worker: get-started/implement/worker-basic.md
- Advanced worker: get-started/implement/worker-advanced.md
- Build your worker image: get-started/build.md
- Run using Docker Compose: get-started/run.md
- Guides:
- guides/index.md
- Configuration:
Expand Down
10 changes: 6 additions & 4 deletions ml_worker/tasks/classify_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,19 @@ async def create_classification_tasks(
return task_ids
logger.info("found %s unclassified documents !", n_docs)
fetch_unclassified_progress = 0.5
progress(fetch_unclassified_progress)
if progress is not None:
await progress(fetch_unclassified_progress)
# Roughly split the load between workers:
# - they should approximately receive the same amount of work
# - they should receive tasks which are long enough to avoid model loading overhead
# - task should be short enough to avoid starting all over again from scratch in
# case of failure
n_tasks = max(n_docs // n_workers, n_docs // (n_workers * 5), 1)
task_batch_size = n_docs // n_tasks
# We scale the progress to post incremental progress updates from 0 to n_tasks
progress = to_scaled_progress(progress, start=fetch_unclassified_progress)
progress = to_raw_progress(progress, max_progress=n_tasks)
if progress is not None:
# We scale the progress to post incremental progress updates from 0 to n_tasks
progress = to_scaled_progress(progress, start=fetch_unclassified_progress)
progress = to_raw_progress(progress, max_progress=n_tasks)
logger.info("creating %s classification tasks...", n_tasks)
# We create classification tasks which will be picked up by the workers
args = {"project": project, "config": config.dict(), "language": language}
Expand Down

0 comments on commit b8b089f

Please sign in to comment.