Skip to content

Commit

Permalink
feature: implement icij-worker workers start
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Feb 2, 2024
1 parent 4b6ba8d commit 03645e0
Show file tree
Hide file tree
Showing 25 changed files with 556 additions and 117 deletions.
2 changes: 1 addition & 1 deletion neo4j
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ Please run ./neo4j format"
command -v poetry 1>/dev/null 2>&1 || echo "poetry is not installed"

cd "$PYTHON_APP_DIR"
poetry install --no-interaction -vvv --with=dev --no-ansi
poetry install --no-interaction -vvv --with=dev --with=cli --no-ansi
echo "Python setup succeeded !"
elif _should_run_project "neo4j_graph_widget_plugin"; then
_print_step "Setting up neo4j_graph_widget_plugin"
Expand Down
4 changes: 2 additions & 2 deletions neo4j-app/neo4j_app/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _lifespan_async_app_config_path() -> Path:
return _ASYNC_APP_CONFIG_PATH


def loggers_enter(**_):
def http_loggers_enter(**_):
config = lifespan_config()
config.setup_loggers()
logger.info("Loggers ready to log 💬")
Expand Down Expand Up @@ -147,7 +147,7 @@ async def run_http_service_deps(

HTTP_SERVICE_LIFESPAN_DEPS = [
("configuration reading", config_enter, None),
("loggers setup", loggers_enter, None),
("loggers setup", http_loggers_enter, None),
(
"write async config for workers",
write_async_app_config_enter,
Expand Down
15 changes: 15 additions & 0 deletions neo4j-app/neo4j_app/core/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import contextlib
import logging
import sys
from abc import ABC
from datetime import datetime
from functools import wraps
Expand Down Expand Up @@ -94,3 +95,17 @@ def log_elapsed_time_cm(
"[%(levelname)s][%(asctime)s.%(msecs)03d][%(workerid)s][%(name)s]: %(message)s"
)
DATE_FMT = "%H:%M:%S"


def setup_loggers():
import neo4j_app

loggers = [neo4j_app.__name__, "__main__"]
level = logging.INFO
stream_handler = logging.StreamHandler(sys.stderr)
stream_handler.setFormatter(logging.Formatter(STREAM_HANDLER_FMT, DATE_FMT))
for logger in loggers:
logger = logging.getLogger(logger)
logger.setLevel(level)
logger.handlers = []
logger.addHandler(stream_handler)
4 changes: 4 additions & 0 deletions neo4j-app/neo4j_app/icij_worker/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from neo4j_app.icij_worker.cli import cli_app

if __name__ == "__main__":
cli_app()
55 changes: 18 additions & 37 deletions neo4j-app/neo4j_app/icij_worker/backend/backend.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import logging
from contextlib import contextmanager
from enum import Enum
from pathlib import Path
from typing import Dict, Optional

from neo4j_app.icij_worker import WorkerConfig
from neo4j_app.icij_worker.backend.mp import run_workers_with_multiprocessing
from neo4j_app.icij_worker.backend.mp import (
run_workers_with_multiprocessing,
run_workers_with_multiprocessing_cm,
)

logger = logging.getLogger(__name__)


class WorkerBackend(str, Enum):
Expand All @@ -14,6 +20,9 @@ class WorkerBackend(str, Enum):
# workers for IO based tasks
MULTIPROCESSING = "multiprocessing"

# TODO: refactor this one to be a function rather than a cm coroutine a context
# manager is no longer needed to run workers inside the HTTP server
@contextmanager
def run(
self,
app: str,
Expand All @@ -22,17 +31,13 @@ def run(
worker_extras: Optional[Dict] = None,
app_deps_extras: Optional[Dict] = None,
):
# This function is meant to be run as the main function of a Python command,
# in this case we want th main process to handle signals
with self._run_cm(
run_workers_with_multiprocessing(
app,
n_workers,
config,
handle_signals=True,
worker_extras=worker_extras,
app_deps_extras=app_deps_extras,
):
pass
)

# TODO: remove this when the HTTP server doesn't
# TODO: also refactor underlying functions to be simple function rather than
Expand All @@ -45,36 +50,12 @@ def run_cm(
config: WorkerConfig,
worker_extras: Optional[Dict] = None,
app_deps_extras: Optional[Dict] = None,
):
# This usage is meant for when a backend is run from another process which
# handles signals by itself
with self._run_cm(
app,
n_workers,
config,
handle_signals=False,
worker_extras=worker_extras,
app_deps_extras=app_deps_extras,
):
yield

@contextmanager
def _run_cm(
self,
app: str,
n_workers: int,
config: WorkerConfig,
*,
handle_signals: bool = False,
worker_extras: Optional[Dict] = None,
app_deps_extras: Optional[Dict] = None,
):
if self is WorkerBackend.MULTIPROCESSING:
with run_workers_with_multiprocessing(
with run_workers_with_multiprocessing_cm(
app,
n_workers,
config,
handle_signals=handle_signals,
worker_extras=worker_extras,
app_deps_extras=app_deps_extras,
):
Expand All @@ -84,15 +65,15 @@ def _run_cm(


def start_workers(
app: str,
n_workers: int,
config_path: Optional[Path],
backend: WorkerBackend,
app: str, n_workers: int, config_path: Optional[Path], backend: WorkerBackend
):
if n_workers < 1:
raise ValueError("n_workers must be >= 1")
if config_path is not None:
logger.info("Loading worker configuration from %s", config_path)
config = WorkerConfig.parse_file(config_path)
else:
config = WorkerConfig()
logger.info("Loading worker configuration from env...")
config = WorkerConfig.from_env()

backend.run(app, n_workers=n_workers, config=config)
102 changes: 82 additions & 20 deletions neo4j-app/neo4j_app/icij_worker/backend/mp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import functools
import logging
import multiprocessing
import os
import signal
import sys
from contextlib import contextmanager
from typing import Dict, Optional
from typing import Callable, Dict, List, Optional, Tuple

from neo4j_app.icij_worker import AsyncApp, Worker, WorkerConfig

Expand Down Expand Up @@ -39,8 +40,9 @@ def _mp_work_forever(
try:
worker.work_forever()
finally:
worker.info("worker stopped working, tearing down %s dependencies", app.name)
logger.info("worker stopped working, tearing down %s dependencies", app.name)
worker.loop.run_until_complete(deps_cm.__aexit__(*sys.exc_info()))
logger.info("dependencies closed for %s !", app.name)


def signal_handler(sig_num, *_):
Expand All @@ -55,37 +57,43 @@ def setup_main_process_signal_handlers():
signal.signal(s, signal_handler)


@contextmanager
def run_workers_with_multiprocessing(
def _get_mp_async_runner(
app: str,
n_workers: int,
config: WorkerConfig,
n_workers: int,
*,
handle_signals: bool = True,
worker_extras: Optional[Dict] = None,
app_deps_extras: Optional[Dict] = None,
):
logger.info("Creating multiprocessing worker pool with %s workers", n_workers)
) -> Tuple[multiprocessing.Pool, List[Tuple[str, Callable[[], None]]]]:
# This function is here to avoid code duplication, it will be removed

# Here we set maxtasksperchild to 1. Each worker has a single never ending task
# which consists in working forever. Additionally, in some cases using
# maxtasksperchild=1 seems to help to terminate the worker pull
# (cpython bug: https://github.com/python/cpython/pull/8009)
mp_ctx = multiprocessing.get_context("spawn")
pool = mp_ctx.Pool(n_workers, maxtasksperchild=1)
main_process_id = os.getpid()
# TODO: make this a bit more informative be for instance adding the child process ID
worker_ids = [f"worker-{main_process_id}-{i}" for i in range(n_workers)]
kwds = {"app": app, "config": config}
kwds["worker_extras"] = worker_extras
kwds["app_deps_extras"] = app_deps_extras
pool = mp_ctx.Pool(n_workers, maxtasksperchild=1)
logger.debug("Setting up signal handlers...")
if handle_signals:
setup_main_process_signal_handlers()
kwds = {
"app": app,
"config": config,
"worker_extras": worker_extras,
"app_deps_extras": app_deps_extras,
}
runners = []
for w_id in worker_ids:
kwds.update({"worker_id": w_id})
runners.append(
(w_id, functools.partial(pool.apply_async, _mp_work_forever, kwds=kwds))
)
return pool, runners


@contextmanager
def _handle_pool_termination(pool: multiprocessing.Pool, handle_signals: bool):
try:
for w_id in worker_ids:
kwds.update({"worker_id": w_id})
logger.info("starting worker %s", w_id)
pool.apply_async(_mp_work_forever, kwds=kwds)
yield
except KeyboardInterrupt as e:
if not handle_signals:
Expand All @@ -102,5 +110,59 @@ def run_workers_with_multiprocessing(
finally:
logger.info("Sending termination signal to workers (SIGTERM)...")
pool.terminate()
pool.join() # Wait forever
pool.join()
logger.info("Terminated worker pool !")


@contextmanager
def run_workers_with_multiprocessing_cm(
app: str,
n_workers: int,
config: WorkerConfig,
*,
worker_extras: Optional[Dict] = None,
app_deps_extras: Optional[Dict] = None,
):
if n_workers < 1:
raise ValueError("n_workers must be >=1")
logger.info("Creating multiprocessing worker pool with %s workers", n_workers)
pool, worker_runners = _get_mp_async_runner(
app,
config,
n_workers,
worker_extras=worker_extras,
app_deps_extras=app_deps_extras,
)
for w_id, process_runner in worker_runners:
logger.info("starting worker %s", w_id)
process_runner()
with _handle_pool_termination(pool, False):
yield


def run_workers_with_multiprocessing(
app: str,
n_workers: int,
config: WorkerConfig,
*,
worker_extras: Optional[Dict] = None,
app_deps_extras: Optional[Dict] = None,
):
if n_workers < 1:
raise ValueError("n_workers must be >=1")
logger.info("Creating multiprocessing worker pool with %s workers", n_workers)
pool, worker_runners = _get_mp_async_runner(
app,
config,
n_workers,
worker_extras=worker_extras,
app_deps_extras=app_deps_extras,
)
setup_main_process_signal_handlers()
tasks = []
for w_id, process_runner in worker_runners:
logger.info("starting worker %s", w_id)
tasks.append(process_runner())
first = tasks[0]
with _handle_pool_termination(pool, True):
first.get()
32 changes: 32 additions & 0 deletions neo4j-app/neo4j_app/icij_worker/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import importlib.metadata
from typing import Annotated, Optional

import typer

from neo4j_app.core.utils.logging import setup_loggers
from neo4j_app.icij_worker.cli.workers import worker_app

cli_app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]})
cli_app.add_typer(worker_app)


def version_callback(value: bool):
if value:
import neo4j_app

package_version = importlib.metadata.version(neo4j_app.__name__)
print(package_version)
raise typer.Exit()


@cli_app.callback(name="icij-worker")
def main(
version: Annotated[ # pylint: disable=unused-argument
Optional[bool],
typer.Option( # pylint: disable=unused-argument
"--version", callback=version_callback, is_eager=True
),
] = None,
):
"""Python async worker pool CLI 🧑‍🏭"""
setup_loggers()
42 changes: 42 additions & 0 deletions neo4j-app/neo4j_app/icij_worker/cli/workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from pathlib import Path
from typing import Annotated, Optional

import typer

from neo4j_app.icij_worker import AsyncApp, WorkerConfig
from neo4j_app.icij_worker import WorkerBackend
from neo4j_app.icij_worker.backend import start_workers


_START_HELP = "Start a pool of workers running the provided app, reading the worker\
configuration from the environment or an optionally provided file."
_APP_HELP = f'Path of the {AsyncApp.__name__} instance to run, fully qualified\
("module.sub.module.app_instance")'
_CONFIG_HELP = f"""Path to a serialized {WorkerConfig.__name__} JSON file.
By default, the configuration is read from the environment.
If present, file values will override environment variables values."""
_N_HELP = "Number of workers."
_BACKEND_HELP = "Python asynchronous backend used to create the worker pool."

_DEFAULT_BACKEND = WorkerBackend.MULTIPROCESSING

worker_app = typer.Typer(name="workers")


@worker_app.command(help=_START_HELP)
def start(
app: Annotated[str, typer.Argument(help=_APP_HELP)],
config: Annotated[
Optional[Path], typer.Option("-c", "--config", help=_CONFIG_HELP)
] = None,
n: Annotated[int, typer.Option("--n-workers", "-n", help=_N_HELP)] = 1,
backend: Annotated[
WorkerBackend,
typer.Option(
help=_BACKEND_HELP,
case_sensitive=False,
show_default=_DEFAULT_BACKEND.value,
),
] = _DEFAULT_BACKEND,
):
start_workers(app=app, n_workers=n, config_path=config, backend=backend)
2 changes: 1 addition & 1 deletion neo4j-app/neo4j_app/icij_worker/utils/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def run_deps(
exit_fn(*exc_info)
except Exception as e: # pylint: disable=broad-exception-caught
to_raise.append(e)
logger.debug("Rolled back all dependencies for %s!", ctx)
logger.debug("Rolled back all dependencies for %s !", ctx)
if to_raise:
for e in to_raise:
logger.error("Error while handling dependencies %s!", e)
Expand Down
Loading

0 comments on commit 03645e0

Please sign in to comment.