Skip to content

Commit

Permalink
refactor: reduce duplicated request codes and add new data processing…
Browse files Browse the repository at this point in the history
… method
  • Loading branch information
100gle committed Oct 7, 2023
1 parent d31a775 commit df32588
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 292 deletions.
33 changes: 32 additions & 1 deletion backend/src/module/notification/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
from abc import ABC, abstractmethod
from textwrap import dedent
from typing import Any, Dict, Literal, Optional, TypeAlias, TypeVar
from typing import Any, Dict, Literal, Optional

import aiohttp
from pydantic import BaseModel, Field
from tenacity import after_log, retry, stop_after_attempt, wait_fixed

logger = logging.getLogger(__name__)

DEFAULT_MESSAGE_TEMPLATE = dedent(
"""\
Expand Down Expand Up @@ -31,3 +35,30 @@ class NotifierAdapter(BaseModel, ABC):
@abstractmethod
def send(self, *args, **kwargs):
raise NotImplementedError("send method is not implemented yet.")


_Mapping = Dict[str, Any]


class NotifierRequestMixin:
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(5),
after=after_log(logger, logging.ERROR),
)
async def asend(
self,
entrypoint: str,
base_url: Optional[str] = None,
method: Literal["GET", "POST"] = "GET",
data: Optional[_Mapping] = None,
params: Optional[_Mapping] = None,
headers: Optional[_Mapping] = None,
) -> Any:
"""asend is a async send method."""
async with aiohttp.ClientSession(base_url=base_url) as req:
resp: aiohttp.ClientResponse = await req.request(
method, entrypoint, data=data, params=params, headers=headers
)

return await resp.json()
64 changes: 43 additions & 21 deletions backend/src/module/notification/services/bark.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,71 @@
import asyncio
import logging
from typing import Any, Dict
from datetime import datetime
from typing import Optional

import aiohttp
from pydantic import BaseModel, Field

from module.models import Notification
from module.notification.base import NotifierAdapter
from module.notification.base import (
DEFAULT_LOG_TEMPLATE,
NotifierAdapter,
NotifierRequestMixin,
)

logger = logging.getLogger(__name__)


class BarkMessage(BaseModel):
title: str = Field("AutoBangumi", description="title")
body: str = Field(..., description="body")
icon: str = Field(..., description="icon")
icon: Optional[str] = Field(None, description="icon")
device_key: str = Field(..., description="device_key")


class BarkService(NotifierAdapter):
class BarkService(NotifierAdapter, NotifierRequestMixin):
token: str = Field(..., description="device_key")
base_url: str = Field("https://api.day.app", description="base_url")

async def _send(self, data: Dict[str, Any]) -> Any:
try:
async with aiohttp.ClientSession(base_url=self.base_url) as req:
resp: aiohttp.ClientResponse = await req.post("/push", data=data)
def _process_input(self, **kwargs):
notification: Optional[Notification] = kwargs.pop("notification", None)
record: Optional[logging.LogRecord] = kwargs.pop("record", None)

return await resp.json()
if notification:
message = self.template.format(**notification.dict())
data = BarkMessage(
title=notification.official_title,
body=message,
icon=notification.poster_path,
device_key=self.token,
)
return data

except Exception as e:
logger.error(f"Bark notification error: {e}")
if record:
if hasattr(record, "asctime"):
dt = record.asctime
else:
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def send(self, notification: Notification, *args, **kwargs):
message = self.template.format(**notification.dict())
message = DEFAULT_LOG_TEMPLATE.format(
dt=dt,
levelname=record.levelname,
msg=record.msg,
)
data = BarkMessage(body=message, device_key=self.token)
return data

data = BarkMessage(
title=notification.official_title,
body=message,
icon=notification.poster_path,
device_key=self.token,
).dict()
raise ValueError("Can't get notification or record input.")

def send(self, **kwargs):
data = self._process_input(**kwargs)
loop = asyncio.get_event_loop()
res = loop.run_until_complete(self._send(data=data))
req = self.asend(
entrypoint="/push",
base_url=self.base_url,
method="POST",
data=data.dict(),
)
res = loop.run_until_complete(req)

if res:
logger.debug(f"Bark notification: {res}")
Expand Down
55 changes: 38 additions & 17 deletions backend/src/module/notification/services/gotify.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
import logging
from datetime import datetime
from typing import Any, Dict, Optional

import aiohttp
from pydantic import BaseModel, Field

from module.models import Notification
from module.notification.base import NotifierAdapter
from module.notification.base import (
DEFAULT_LOG_TEMPLATE,
NotifierAdapter,
NotifierRequestMixin,
)

logger = logging.getLogger(__name__)

Expand All @@ -21,31 +25,48 @@ class GotifyMessage(BaseModel):
)


class GotifyService(NotifierAdapter):
class GotifyService(NotifierAdapter, NotifierRequestMixin):
"""GotifyService is a class for gotify notification service"""

token: str = Field(..., description="gotify client or app token")
base_url: str = Field(..., description="gotify base url")

async def _send(self, data: Dict[str, Any]) -> Any:
async with aiohttp.ClientSession(base_url=self.base_url) as req:
try:
resp: aiohttp.ClientResponse = await req.post(
"/message", params={"token": self.token}, data=data
)
def _process_input(self, **kwargs):
notification: Optional[Notification] = kwargs.pop("notification", None)
record: Optional[logging.LogRecord] = kwargs.pop("record", None)

return await resp.json()
if notification:
message = self.template.format(**notification.dict())
data = GotifyMessage(message=message)
return data

except Exception as e:
logger.error(f"Gotify notification error: {e}")
if record:
if hasattr(record, "asctime"):
dt = record.asctime
else:
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def send(self, notification: Notification, *args, **kwargs):
message = self.template.format(**notification.dict())
message = DEFAULT_LOG_TEMPLATE.format(
dt=dt,
levelname=record.levelname,
msg=record.msg,
)
data = GotifyMessage(message=message, priority=8)
return data

# TODO: priority should be aliased with log level
data = GotifyMessage(message=message).dict()
raise ValueError("Can't get notification or record input.")

def send(self, **kwargs) -> Any:
data = self._process_input(**kwargs)
loop = asyncio.get_event_loop()
res = loop.run_until_complete(self._send(data=data))
req = self.asend(
entrypoint="/message",
base_url=self.base_url,
method="POST",
params={"token": self.token},
data=data.dict(),
)
res = loop.run_until_complete(req)

if res:
logger.debug(f"Gotify notification: {res}")
Expand Down
59 changes: 39 additions & 20 deletions backend/src/module/notification/services/server_chan.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
import logging
from typing import Any, Dict
from datetime import datetime
from typing import Optional

import aiohttp
from pydantic import BaseModel, Field

from module.models import Notification
from module.notification.base import NotifierAdapter
from module.notification.base import (
DEFAULT_LOG_TEMPLATE,
NotifierAdapter,
NotifierRequestMixin,
)

logger = logging.getLogger(__name__)

Expand All @@ -16,32 +20,47 @@ class ServerChanMessage(BaseModel):
desp: str = Field(..., description="description")


class ServerChanService(NotifierAdapter):
class ServerChanService(NotifierAdapter, NotifierRequestMixin):
token: str = Field(..., description="server chan token")
base_url: str = Field("https://sctapi.ftqq.com", description="server chan base url")

async def _send(self, data: Dict[str, Any]) -> Any:
async with aiohttp.ClientSession(base_url=self.base_url) as req:
try:
resp: aiohttp.ClientResponse = await req.get(
f"/{self.token}.send", params=data
)
def _process_input(self, **kwargs):
notification: Optional[Notification] = kwargs.pop("notification", None)
record: Optional[logging.LogRecord] = kwargs.pop("record", None)

return await resp.json()
if notification:
message = self.template.format(**notification.dict())
data = ServerChanMessage(
title=notification.official_title,
desp=message,
)
return data

except Exception as e:
logger.error(f"ServerChan notification error: {e}")
if record:
if hasattr(record, "asctime"):
dt = record.asctime
else:
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def send(self, notification: Notification, *args, **kwargs):
message = self.template.format(**notification.dict())
message = DEFAULT_LOG_TEMPLATE.format(
dt=dt,
levelname=record.levelname,
msg=record.msg,
)
data = ServerChanMessage(desp=message)
return data

data = ServerChanMessage(
title=notification.official_title,
desp=message,
).dict()
raise ValueError("Can't get notification or record input.")

def send(self, **kwargs):
data = self._process_input(**kwargs)
loop = asyncio.get_event_loop()
res = loop.run_until_complete(self._send(data=data))
req = self.asend(
entrypoint=f"/{self.token}.send",
base_url=self.base_url,
params=data.dict(),
)
res = loop.run_until_complete(req)

if res:
logger.debug(f"Telegram notification: {res}")
Expand Down
Loading

0 comments on commit df32588

Please sign in to comment.