Skip to content

Commit

Permalink
feat(backend-python): add wrappers for nats client in lib.Service and…
Browse files Browse the repository at this point in the history
… fix various bugs, add test script for development (will be deleted before merge into main)
  • Loading branch information
cb0s committed Mar 14, 2024
1 parent fa72884 commit 944d2d0
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 10 deletions.
19 changes: 14 additions & 5 deletions backend-python/src/telestion/backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from typing import Any, TypeVar

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ConfigDict


class TelestionConfig(BaseModel):
Expand All @@ -19,12 +19,20 @@ class TelestionConfig(BaseModel):

unparsed_cli: list[str] = Field(alias="_telestion_validator_unparsed_cli")

# To include all envs and config parts -> it is recommended to add a custom subtype
model_config = ConfigDict(
extra='allow'
)


# With this we allow users to extend TelestionConfig for finer control over custom config fields
_TelestionConfigT = TypeVar("_TelestionConfigT", bound=TelestionConfig, default=TelestionConfig)
_TelestionConfigT = TypeVar("_TelestionConfigT", bound=TelestionConfig)


def build_config(clazz: type[_TelestionConfigT] = None) -> _TelestionConfigT:
if clazz is None:
clazz = TelestionConfig

def build_config() -> _TelestionConfigT:
cli_args, additional_args = _parse_cli()

def _from_env_or_cli(key: str):
Expand All @@ -34,7 +42,7 @@ def _from_env_or_cli(key: str):
config_key = _from_env_or_cli('CONFIG_KEY')

config_assembly: dict[str, Any] = dict()
if 'dev' in cli_args:
if 'dev' in cli_args and cli_args['dev']:
# 1. Add default config
config_assembly.update(defaults())

Expand All @@ -52,7 +60,7 @@ def _from_env_or_cli(key: str):
# 5. Add args that cannot be parsed by the pipeline, i.e. service specific config
config_assembly['_telestion_validator_unparsed_cli'] = additional_args

return TelestionConfig(**config_assembly)
return clazz(**config_assembly)


def defaults() -> dict[str, Any]:
Expand All @@ -71,6 +79,7 @@ def _parse_cli() -> tuple[dict[str, Any], list[str]]:
description=description,
epilog=epilog,
prog="Telestion-CLI (Python)",
argument_default=argparse.SUPPRESS
)

parser.add_argument("--dev", action='store_true', help="If set, program will start in development mode")
Expand Down
42 changes: 37 additions & 5 deletions backend-python/src/telestion/backend/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from typing import Any

import nats
from nats.aio.client import Client as NatsClient # mostly for type hinting
from nats.aio.client import Client as NatsClient, Msg as NatsMsg, DEFAULT_FLUSH_TIMEOUT # mostly for type hinting
from nats.aio.subscription import Subscription

from telestion.backend.config import TelestionConfig, build_config

Expand All @@ -16,6 +17,31 @@ class Service:
service_name: str
config: TelestionConfig

# wrapper methods for NatsClient instance for convenience
async def publish(self, **kwargs) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.publish"""
await self.nc.publish(**kwargs)

async def subscribe(self, **kwargs) -> Subscription:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.subscribe"""
return await self.nc.subscribe(**kwargs)

async def request(self, **kwargs) -> NatsMsg:
"""Wrapper for Client.request(subject, payload, timeout, old_style, headers)"""
return await self.nc.request(**kwargs)

async def flush(self, timeout: int = DEFAULT_FLUSH_TIMEOUT) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.flush"""
await self.nc.flush(timeout)

async def drain(self) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.drain"""
await self.nc.drain()

async def close(self) -> None:
"""Wrapper for https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.close"""
await self.nc.close()


@dataclass
class Options:
Expand Down Expand Up @@ -47,16 +73,22 @@ async def start_service(opts: Options = None) -> Service:
if not opts.nats or opts.custom_nc is not None:
return service

nc = await nats.connect(servers=_prepare_nats_url(config))
async def error_cb(err):
print(err)

nc = await nats.connect(servers=_prepare_nats_url(config), error_cb=error_cb)
# Setup healthcheck
await nc.subscribe(
'__telestion__.health',
cb=lambda msg: msg.respond(
async def respond(msg):
msg.respond(
json_encode({
"errors": 0,
"name": config.service_name
})
)

await nc.subscribe(
'__telestion__.health',
cb=respond
)

return replace(service, nc=nc)
Expand Down
18 changes: 18 additions & 0 deletions backend-python/src/telestion/backend/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio

from telestion.backend.config import build_config, TelestionConfig
from telestion.backend.lib import start_service


async def main():
import sys
# Put dev server address here for testing
sys.argv.extend(['--dev', '--NATS_URL', 'nats://172.21.73.221:4222'])
_config = build_config(TelestionConfig)
print(_config)
service = await start_service()
await service.nc.close()


if __name__ == '__main__':
asyncio.run(main())

0 comments on commit 944d2d0

Please sign in to comment.