diff --git a/pyproject.toml b/pyproject.toml index 3113b228..11966d2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,5 +94,5 @@ exclude = ''' include_trailing_comma = true multi_line_output = 3 known_first_party = "safir" -known_third_party = ["aiohttp", "pytest", "setuptools", "structlog"] +known_third_party = ["aiohttp", "aiokafka", "pytest", "setuptools", "structlog"] skip = ["docs/conf.py"] diff --git a/setup.cfg b/setup.cfg index f9bc097e..365cb917 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,6 +37,7 @@ install_requires = importlib_metadata; python_version < "3.8" structlog aiohttp + aiokafka [options.packages.find] where = src diff --git a/src/safir/events.py b/src/safir/events.py index 1da93d8c..2666b0ef 100644 --- a/src/safir/events.py +++ b/src/safir/events.py @@ -2,13 +2,15 @@ from __future__ import annotations +import asyncio import ssl from pathlib import Path from typing import TYPE_CHECKING import structlog +from aiokafka import AIOKafkaProducer -__all__ = ["configure_kafka_ssl"] +__all__ = ["configure_kafka_ssl", "init_kafka_producer"] if TYPE_CHECKING: from typing import AsyncGenerator @@ -109,3 +111,60 @@ async def configure_kafka_ssl(app: Application) -> AsyncGenerator: logger.info("Created Kafka SSL context") yield + + +async def init_kafka_producer(app: Application) -> AsyncGenerator: + """Initialize and cleanup the aiokafka producer instance. + + Parameters + ---------- + app : `aiohttp.web.Application` + The aiohttp.web-based application. This app *must* include a standard + configuration object at the ``"safir/config"`` key. The config must + have these attributes: + + ``logger_name`` + Name of the application's logger. + ``kafka_broker_url`` + The URL of a Kafka broker. + ``kafka_protocol`` + The protocol for Kafka broker communication. + + Additionally, `configure_kafka_ssl` must be applied **before** this + initializer so that ``safir/kafka_ssl_context`` is set on the + application. + + Notes + ----- + This initializer adds an `aiokafka.AIOKafkaProducer` instance to the + ``app`` under the ``safir/kafka_producer`` key. + + Examples + -------- + Use this function as a `cleanup context + `__. + + To access the producer: + + .. code-block:: python + + producer = app["safir/kafka_producer"] + """ + # Startup phase + logger = structlog.get_logger(app["safir/config"].logger_name) + logger.info("Starting Kafka producer") + producer = AIOKafkaProducer( + loop=asyncio.get_running_loop(), + bootstrap_servers=app["safir/config"].kafka_broker_url, + ssl_context=app["safir/kafka_ssl_context"], + security_protocol=app["safir/kafka_protocol"], + ) + await producer.start() + app["safir/kafka_producer"] = producer + logger.info("Finished starting Kafka producer") + + yield + + # Cleanup phase + logger.info("Shutting down Kafka producer") + await producer.stop()