diff --git a/backend/src/module/notification/base.py b/backend/src/module/notification/base.py index 96d1f4de..64a43ebc 100644 --- a/backend/src/module/notification/base.py +++ b/backend/src/module/notification/base.py @@ -1,9 +1,13 @@ +import logging from abc import ABC, abstractmethod from textwrap import dedent -from typing import Any, Dict, Literal, Optional, TypeAlias, TypeVar +from typing import Any, Dict, Literal, Optional import aiohttp from pydantic import BaseModel, Field +from tenacity import after_log, retry, stop_after_attempt, wait_fixed + +logger = logging.getLogger(__name__) DEFAULT_MESSAGE_TEMPLATE = dedent( """\ @@ -31,3 +35,30 @@ class NotifierAdapter(BaseModel, ABC): @abstractmethod def send(self, *args, **kwargs): raise NotImplementedError("send method is not implemented yet.") + + +_Mapping = Dict[str, Any] + + +class NotifierRequestMixin: + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(5), + after=after_log(logger, logging.ERROR), + ) + async def asend( + self, + entrypoint: str, + base_url: Optional[str] = None, + method: Literal["GET", "POST"] = "GET", + data: Optional[_Mapping] = None, + params: Optional[_Mapping] = None, + headers: Optional[_Mapping] = None, + ) -> Any: + """asend is a async send method.""" + async with aiohttp.ClientSession(base_url=base_url) as req: + resp: aiohttp.ClientResponse = await req.request( + method, entrypoint, data=data, params=params, headers=headers + ) + + return await resp.json() diff --git a/backend/src/module/notification/services/bark.py b/backend/src/module/notification/services/bark.py index add4c0e9..160eaab3 100644 --- a/backend/src/module/notification/services/bark.py +++ b/backend/src/module/notification/services/bark.py @@ -1,12 +1,16 @@ import asyncio import logging -from typing import Any, Dict +from datetime import datetime +from typing import Optional -import aiohttp from pydantic import BaseModel, Field from module.models import Notification -from module.notification.base import NotifierAdapter +from module.notification.base import ( + DEFAULT_LOG_TEMPLATE, + NotifierAdapter, + NotifierRequestMixin, +) logger = logging.getLogger(__name__) @@ -14,36 +18,54 @@ class BarkMessage(BaseModel): title: str = Field("AutoBangumi", description="title") body: str = Field(..., description="body") - icon: str = Field(..., description="icon") + icon: Optional[str] = Field(None, description="icon") device_key: str = Field(..., description="device_key") -class BarkService(NotifierAdapter): +class BarkService(NotifierAdapter, NotifierRequestMixin): token: str = Field(..., description="device_key") base_url: str = Field("https://api.day.app", description="base_url") - async def _send(self, data: Dict[str, Any]) -> Any: - try: - async with aiohttp.ClientSession(base_url=self.base_url) as req: - resp: aiohttp.ClientResponse = await req.post("/push", data=data) + def _process_input(self, **kwargs): + notification: Optional[Notification] = kwargs.pop("notification", None) + record: Optional[logging.LogRecord] = kwargs.pop("record", None) - return await resp.json() + if notification: + message = self.template.format(**notification.dict()) + data = BarkMessage( + title=notification.official_title, + body=message, + icon=notification.poster_path, + device_key=self.token, + ) + return data - except Exception as e: - logger.error(f"Bark notification error: {e}") + if record: + if hasattr(record, "asctime"): + dt = record.asctime + else: + dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - def send(self, notification: Notification, *args, **kwargs): - message = self.template.format(**notification.dict()) + message = DEFAULT_LOG_TEMPLATE.format( + dt=dt, + levelname=record.levelname, + msg=record.msg, + ) + data = BarkMessage(body=message, device_key=self.token) + return data - data = BarkMessage( - title=notification.official_title, - body=message, - icon=notification.poster_path, - device_key=self.token, - ).dict() + raise ValueError("Can't get notification or record input.") + def send(self, **kwargs): + data = self._process_input(**kwargs) loop = asyncio.get_event_loop() - res = loop.run_until_complete(self._send(data=data)) + req = self.asend( + entrypoint="/push", + base_url=self.base_url, + method="POST", + data=data.dict(), + ) + res = loop.run_until_complete(req) if res: logger.debug(f"Bark notification: {res}") diff --git a/backend/src/module/notification/services/gotify.py b/backend/src/module/notification/services/gotify.py index 8406f1f7..9f20403d 100644 --- a/backend/src/module/notification/services/gotify.py +++ b/backend/src/module/notification/services/gotify.py @@ -1,12 +1,16 @@ import asyncio import logging +from datetime import datetime from typing import Any, Dict, Optional -import aiohttp from pydantic import BaseModel, Field from module.models import Notification -from module.notification.base import NotifierAdapter +from module.notification.base import ( + DEFAULT_LOG_TEMPLATE, + NotifierAdapter, + NotifierRequestMixin, +) logger = logging.getLogger(__name__) @@ -21,31 +25,48 @@ class GotifyMessage(BaseModel): ) -class GotifyService(NotifierAdapter): +class GotifyService(NotifierAdapter, NotifierRequestMixin): """GotifyService is a class for gotify notification service""" token: str = Field(..., description="gotify client or app token") base_url: str = Field(..., description="gotify base url") - async def _send(self, data: Dict[str, Any]) -> Any: - async with aiohttp.ClientSession(base_url=self.base_url) as req: - try: - resp: aiohttp.ClientResponse = await req.post( - "/message", params={"token": self.token}, data=data - ) + def _process_input(self, **kwargs): + notification: Optional[Notification] = kwargs.pop("notification", None) + record: Optional[logging.LogRecord] = kwargs.pop("record", None) - return await resp.json() + if notification: + message = self.template.format(**notification.dict()) + data = GotifyMessage(message=message) + return data - except Exception as e: - logger.error(f"Gotify notification error: {e}") + if record: + if hasattr(record, "asctime"): + dt = record.asctime + else: + dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - def send(self, notification: Notification, *args, **kwargs): - message = self.template.format(**notification.dict()) + message = DEFAULT_LOG_TEMPLATE.format( + dt=dt, + levelname=record.levelname, + msg=record.msg, + ) + data = GotifyMessage(message=message, priority=8) + return data - # TODO: priority should be aliased with log level - data = GotifyMessage(message=message).dict() + raise ValueError("Can't get notification or record input.") + + def send(self, **kwargs) -> Any: + data = self._process_input(**kwargs) loop = asyncio.get_event_loop() - res = loop.run_until_complete(self._send(data=data)) + req = self.asend( + entrypoint="/message", + base_url=self.base_url, + method="POST", + params={"token": self.token}, + data=data.dict(), + ) + res = loop.run_until_complete(req) if res: logger.debug(f"Gotify notification: {res}") diff --git a/backend/src/module/notification/services/server_chan.py b/backend/src/module/notification/services/server_chan.py index 7bd4a3dc..f4b8bb8c 100644 --- a/backend/src/module/notification/services/server_chan.py +++ b/backend/src/module/notification/services/server_chan.py @@ -1,12 +1,16 @@ import asyncio import logging -from typing import Any, Dict +from datetime import datetime +from typing import Optional -import aiohttp from pydantic import BaseModel, Field from module.models import Notification -from module.notification.base import NotifierAdapter +from module.notification.base import ( + DEFAULT_LOG_TEMPLATE, + NotifierAdapter, + NotifierRequestMixin, +) logger = logging.getLogger(__name__) @@ -16,32 +20,47 @@ class ServerChanMessage(BaseModel): desp: str = Field(..., description="description") -class ServerChanService(NotifierAdapter): +class ServerChanService(NotifierAdapter, NotifierRequestMixin): token: str = Field(..., description="server chan token") base_url: str = Field("https://sctapi.ftqq.com", description="server chan base url") - async def _send(self, data: Dict[str, Any]) -> Any: - async with aiohttp.ClientSession(base_url=self.base_url) as req: - try: - resp: aiohttp.ClientResponse = await req.get( - f"/{self.token}.send", params=data - ) + def _process_input(self, **kwargs): + notification: Optional[Notification] = kwargs.pop("notification", None) + record: Optional[logging.LogRecord] = kwargs.pop("record", None) - return await resp.json() + if notification: + message = self.template.format(**notification.dict()) + data = ServerChanMessage( + title=notification.official_title, + desp=message, + ) + return data - except Exception as e: - logger.error(f"ServerChan notification error: {e}") + if record: + if hasattr(record, "asctime"): + dt = record.asctime + else: + dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - def send(self, notification: Notification, *args, **kwargs): - message = self.template.format(**notification.dict()) + message = DEFAULT_LOG_TEMPLATE.format( + dt=dt, + levelname=record.levelname, + msg=record.msg, + ) + data = ServerChanMessage(desp=message) + return data - data = ServerChanMessage( - title=notification.official_title, - desp=message, - ).dict() + raise ValueError("Can't get notification or record input.") + def send(self, **kwargs): + data = self._process_input(**kwargs) loop = asyncio.get_event_loop() - res = loop.run_until_complete(self._send(data=data)) + req = self.asend( + entrypoint=f"/{self.token}.send", + base_url=self.base_url, + params=data.dict(), + ) + res = loop.run_until_complete(req) if res: logger.debug(f"Telegram notification: {res}") diff --git a/backend/src/module/notification/services/slack.py b/backend/src/module/notification/services/slack.py index f5211f75..b8c5b05b 100644 --- a/backend/src/module/notification/services/slack.py +++ b/backend/src/module/notification/services/slack.py @@ -1,12 +1,16 @@ import asyncio import logging -from typing import Any, Dict, List +from datetime import datetime +from typing import List, Optional -import aiohttp from pydantic import BaseModel, Field from module.models import Notification -from module.notification.base import NotifierAdapter +from module.notification.base import ( + DEFAULT_LOG_TEMPLATE, + NotifierAdapter, + NotifierRequestMixin, +) logger = logging.getLogger(__name__) @@ -22,41 +26,65 @@ class SlackMessage(BaseModel): attechment: List[SlackAttachment] = Field(..., description="attechments") -class SlackService(NotifierAdapter): +class SlackService(NotifierAdapter, NotifierRequestMixin): token: str = Field(..., description="slack token") channel: str = Field(..., description="slack channel id") base_url: str = Field("https://slack.com", description="slack base url") - async def _send(self, data: Dict[str, Any]) -> Any: - async with aiohttp.ClientSession(base_url=self.base_url) as req: - try: - resp: aiohttp.ClientResponse = await req.post( - "/api/chat.postMessage", - headers={"Authorization": f"Bearer {self.token}"}, - data=data, - ) - - return await resp.json() - - except Exception as e: - logger.error(f"Slack notification error: {e}") - - def send(self, notification: Notification, *args, **kwargs): - message = self.template.format(**notification.dict()) - - data = SlackMessage( - channel=self.channel, - attechment=[ - SlackAttachment( - title=notification.official_title, - text=message, - image_url=notification.poster_path, - ) - ], - ).dict() - + def _process_input(self, **kwargs): + notification: Optional[Notification] = kwargs.pop("notification", None) + record: Optional[logging.LogRecord] = kwargs.pop("record", None) + + if notification: + message = self.template.format(**notification.dict()) + data = SlackMessage( + channel=self.channel, + attechment=[ + SlackAttachment( + title=notification.official_title, + text=message, + image_url=notification.poster_path, + ) + ], + ) + return data + + if record: + if hasattr(record, "asctime"): + dt = record.asctime + else: + dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + message = DEFAULT_LOG_TEMPLATE.format( + dt=dt, + levelname=record.levelname, + msg=record.msg, + ) + + data = SlackMessage( + channel=self.channel, + attechment=[ + SlackAttachment( + title="AutoBangumi", + text=message, + ) + ], + ) + return data + + raise ValueError("Can't get notification or record input.") + + def send(self, **kwargs): + data = self._process_input(**kwargs) loop = asyncio.get_event_loop() - res = loop.run_until_complete(self._send(data=data)) + req = self.asend( + entrypoint="/api/chat.postMessage", + base_url=self.base_url, + method="POST", + headers={"Authorization": f"Bearer {self.token}"}, + data=data.dict(), + ) + res = loop.run_until_complete(req) if res: logger.debug(f"Telegram notification: {res}") diff --git a/backend/src/module/notification/services/telegram.py b/backend/src/module/notification/services/telegram.py index 7abece78..debb939a 100644 --- a/backend/src/module/notification/services/telegram.py +++ b/backend/src/module/notification/services/telegram.py @@ -1,12 +1,16 @@ import asyncio import logging -from typing import Any, Dict +from datetime import datetime +from typing import Optional -import aiohttp from pydantic import BaseModel, Field from module.models import Notification -from module.notification.base import NotifierAdapter +from module.notification.base import ( + DEFAULT_LOG_TEMPLATE, + NotifierAdapter, + NotifierRequestMixin, +) logger = logging.getLogger(__name__) @@ -19,7 +23,7 @@ class TelegramPhotoMessage(BaseModel): disable_notification: bool = True -class TelegramService(NotifierAdapter): +class TelegramService(NotifierAdapter, NotifierRequestMixin): """TelegramService is a class for telegram notification service""" token: str = Field(..., description="telegram bot token") @@ -29,30 +33,49 @@ class TelegramService(NotifierAdapter): description="telegram bot base url", ) - async def _send(self, data: Dict[str, Any], **kwargs) -> Any: - async with aiohttp.ClientSession(base_url=self.base_url) as req: - try: - resp: aiohttp.ClientResponse = await req.post( - f"/bot{self.token}/sendPhoto", data=data - ) - - return await resp.json() - - except Exception as e: - logger.error(f"Telegram notification error: {e}") - return - - def send(self, notification: Notification, *args, **kwargs): - message = self.template.format(**notification.dict()) - - data = TelegramPhotoMessage( - chat_id=self.chat_id, - caption=message, - photo=notification.poster_path, - ).dict() - + def _process_input(self, **kwargs): + notification: Optional[Notification] = kwargs.pop("notification", None) + record: Optional[logging.LogRecord] = kwargs.pop("record", None) + + if notification: + message = self.template.format(**notification.dict()) + data = TelegramPhotoMessage( + chat_id=self.chat_id, + caption=message, + photo=notification.poster_path, + ) + return data + + if record: + if hasattr(record, "asctime"): + dt = record.asctime + else: + dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + message = DEFAULT_LOG_TEMPLATE.format( + dt=dt, + levelname=record.levelname, + msg=record.msg, + ) + + data = TelegramPhotoMessage( + chat_id=self.chat_id, + caption=message, + photo="https://article.biliimg.com/bfs/article/d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png", + ) + + raise ValueError("Can't get notification or record input.") + + def send(self, **kwargs): + data = self._process_input(**kwargs) loop = asyncio.get_event_loop() - res = loop.run_until_complete(self._send(data)) + req = self.asend( + entrypoint=f"/bot{self.token}/sendPhoto", + base_url=self.base_url, + method="POST", + data=data, + ) + res = loop.run_until_complete(req) if res: logger.debug(f"Telegram notification: {res}") diff --git a/backend/src/module/notification/services/wecom.py b/backend/src/module/notification/services/wecom.py index c7c5f5bc..12121690 100644 --- a/backend/src/module/notification/services/wecom.py +++ b/backend/src/module/notification/services/wecom.py @@ -1,12 +1,16 @@ import asyncio import logging -from typing import Any, Dict, List +from datetime import datetime +from typing import List, Optional -import aiohttp from pydantic import BaseModel, Field, validator from module.models import Notification -from module.notification.base import NotifierAdapter +from module.notification.base import ( + DEFAULT_LOG_TEMPLATE, + NotifierAdapter, + NotifierRequestMixin, +) logger = logging.getLogger(__name__) @@ -31,7 +35,7 @@ class WecomMessage(BaseModel): articles: List[WecomArticle] = Field(..., description="articles") -class WecomService(NotifierAdapter): +class WecomService(NotifierAdapter, NotifierRequestMixin): token: str = Field(..., description="wecom access token") agentid: str = Field(..., description="wecom agent id") base_url: str = Field( @@ -39,38 +43,61 @@ class WecomService(NotifierAdapter): description="wecom notification url", ) - async def _send(self, data: Dict[str, Any], **kwargs) -> Any: - async with aiohttp.ClientSession(base_url=self.base_url) as req: - try: - resp: aiohttp.ClientResponse = await req.post( - "/cgi-bin/message/send", - params={"access_token": self.token}, - data=data, - ) - - return await resp.json() - - except Exception as e: - logger.error(f"Wecom notification error: {e}") - - def send(self, notification: Notification, *args, **kwargs): - message = self.template.format(**notification.dict()) - - title = "【番剧更新】" + notification.official_title - - data = WecomMessage( - agentid=self.agentid, - articles=[ - WecomArticle( - title=title, - description=message, - picurl=notification.poster_path, - ) - ], - ).dict() - + def _process_input(self, **kwargs): + notification: Optional[Notification] = kwargs.pop("notification", None) + record: Optional[logging.LogRecord] = kwargs.pop("record", None) + + if notification: + message = self.template.format(**notification.dict()) + title = "【番剧更新】" + notification.official_title + data = WecomMessage( + agentid=self.agentid, + articles=[ + WecomArticle( + title=title, + description=message, + picurl=notification.poster_path, + ) + ], + ) + return data + + if record: + if hasattr(record, "asctime"): + dt = record.asctime + else: + dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + message = DEFAULT_LOG_TEMPLATE.format( + dt=dt, + levelname=record.levelname, + msg=record.msg, + ) + + data = WecomMessage( + agentid=self.agentid, + articles=[ + WecomArticle( + description=message, + picurl="https://article.biliimg.com/bfs/article/d8bcd0408bf32594fd82f27de7d2c685829d1b2e.png", + ) + ], + ) + return data + + raise ValueError("Can't get notification or record input.") + + def send(self, **kwargs): + data = self._process_input(**kwargs) loop = asyncio.get_event_loop() - res = loop.run_until_complete(self._send(data=data)) + req = self.asend( + entrypoint="/cgi-bin/message/send", + base_url=self.base_url, + method="POST", + params={"access_token": self.token}, + data=data.dict(), + ) + res = loop.run_until_complete(req) if res: logger.debug(f"Telegram notification: {res}") diff --git a/backend/src/test/notification/services/test_bark.py b/backend/src/test/notification/services/test_bark.py index 99819fe1..29e26ba3 100644 --- a/backend/src/test/notification/services/test_bark.py +++ b/backend/src/test/notification/services/test_bark.py @@ -44,28 +44,6 @@ def test_init_properties(self): assert self.bark.token == self.token assert self.bark.base_url == "https://api.day.app" - @pytest.mark.asyncio - async def test__send(self, fake_notification): - # Create a mock response for the HTTP request - with aioresponses() as m: - m.post("https://api.day.app/push") - - data = BarkMessage( - title=fake_notification.official_title, - body="test message", - icon=fake_notification.poster_path, - device_key=self.token, - ) - - # Call the send method - await self.bark._send(data.dict()) - - m.assert_called_once_with( - "/push", - method="POST", - data=data.dict(), - ) - def test_send(self, fake_notification): with mock.patch("module.notification.services.bark.BarkService.send") as m: return_value = {"errcode": 0, "errmsg": "ok"} diff --git a/backend/src/test/notification/services/test_gotify.py b/backend/src/test/notification/services/test_gotify.py index 95a0b1d1..d10cb67a 100644 --- a/backend/src/test/notification/services/test_gotify.py +++ b/backend/src/test/notification/services/test_gotify.py @@ -63,28 +63,6 @@ def test_init_properties(self): assert self.gotify.token == self.token assert self.gotify.base_url == self.base_url - @pytest.mark.asyncio - async def test__send(self, fake_notification): - # Create a mock response for the HTTP request - with aioresponses() as m: - m.post(f"https://example.com/message?token={self.token}") - - data = GotifyMessage( - title=fake_notification.official_title, - message="test message", - priority=10, - ) - - # Call the send method - await self.gotify._send(data.dict()) - - m.assert_called_once_with( - "/message", - method="POST", - params={"token": self.token}, - data=data.dict(), - ) - def test_send(self, fake_notification): with mock.patch("module.notification.services.gotify.GotifyService.send") as m: return_value = {"errcode": 0, "errmsg": "ok"} diff --git a/backend/src/test/notification/services/test_server_chan.py b/backend/src/test/notification/services/test_server_chan.py index 3035edd5..957be770 100644 --- a/backend/src/test/notification/services/test_server_chan.py +++ b/backend/src/test/notification/services/test_server_chan.py @@ -42,23 +42,6 @@ def test_init_properties(self): assert self.server_chan.token == self.token assert self.server_chan.base_url == self.base_url - @pytest.mark.asyncio - async def test__send(self, fake_notification): - # Create a mock response for the HTTP request - with aioresponses() as m: - m.get(f"https://sctapi.ftqq.com/{self.token}.send") - - data = ServerChanMessage(title=fake_notification.official_title, desp="foo") - - # Call the send method - await self.server_chan._send(data.dict()) - - m.assert_called_once_with( - f"/{self.token}.send", - method="GET", - params=data.dict(), - ) - def test_send(self, fake_notification): with mock.patch( "module.notification.services.server_chan.ServerChanService.send" diff --git a/backend/src/test/notification/services/test_slack.py b/backend/src/test/notification/services/test_slack.py index 0be7e784..063ee230 100644 --- a/backend/src/test/notification/services/test_slack.py +++ b/backend/src/test/notification/services/test_slack.py @@ -58,36 +58,6 @@ def test_init_properties(self): assert self.slack.channel == self.channel assert self.slack.base_url == self.base_url - @pytest.mark.asyncio - async def test__send(self, fake_notification): - # Create a mock response for the HTTP request - with aioresponses() as m: - m.post( - "https://slack.com/api/chat.postMessage", - headers={"Authorization": f"Bearer {self.token}"}, - ) - - data = SlackMessage( - channel=self.channel, - attechment=[ - SlackAttachment( - title=fake_notification.official_title, - text=fake_notification.poster_path, - image_url=fake_notification.poster_path, - ) - ], - ) - - # Call the send method - await self.slack._send(data.dict()) - - m.assert_called_once_with( - "/api/chat.postMessage", - method="POST", - headers={"Authorization": f"Bearer {self.token}"}, - data=data.dict(), - ) - def test_send(self, fake_notification): with mock.patch("module.notification.services.slack.SlackService.send") as m: return_value = {"errcode": 0, "errmsg": "ok"} diff --git a/backend/src/test/notification/services/test_telegram.py b/backend/src/test/notification/services/test_telegram.py index 0918f0e5..53a54485 100644 --- a/backend/src/test/notification/services/test_telegram.py +++ b/backend/src/test/notification/services/test_telegram.py @@ -41,26 +41,6 @@ def test_init_properties(self): assert self.telegram.chat_id == self.chat_id assert self.telegram.base_url == self.base_url - @pytest.mark.asyncio - async def test__send(self, fake_notification): - # Create a mock response for the HTTP request - with aioresponses() as m: - data = TelegramPhotoMessage( - chat_id=self.chat_id, - caption="Test Caption", - photo=fake_notification.poster_path, - ) - m.post(f"https://api.telegram.org/bot{self.token}/sendPhoto") - - # Call the send method - await self.telegram._send(data.dict()) - - m.assert_called_once_with( - f"/bot{self.token}/sendPhoto", - method="POST", - data=data.dict(), - ) - def test_send(self, fake_notification): with mock.patch( "module.notification.services.telegram.TelegramService.send" diff --git a/backend/src/test/notification/services/test_wecom.py b/backend/src/test/notification/services/test_wecom.py index 11bc18ca..5207da4f 100644 --- a/backend/src/test/notification/services/test_wecom.py +++ b/backend/src/test/notification/services/test_wecom.py @@ -70,33 +70,6 @@ def test_init_properties(self): assert self.wecom.agentid == self.agentid assert self.wecom.base_url == self.base_url - @pytest.mark.asyncio - async def test__send(self, fake_notification): - # Create a mock response for the HTTP request - with aioresponses() as m: - m.post("https://qyapi.weixin.qq.com/cgi-bin/message/send") - - data = WecomMessage( - agentid=self.agentid, - articles=[ - WecomArticle( - title="Test Title", - description="", - picurl="https://example.com/image.jpg", - ) - ], - ) - - # Call the send method - await self.wecom._send(data.dict()) - - m.assert_called_once_with( - "/cgi-bin/message/send", - method="POST", - data=data.dict(), - params={"access_token": "YOUR_TOKEN"}, - ) - def test_send(self, fake_notification): with mock.patch("module.notification.services.wecom.WecomService.send") as m: return_value = {"errcode": 0, "errmsg": "ok"} diff --git a/backend/src/test/notification/test_base.py b/backend/src/test/notification/test_base.py index 31d76464..71ed3016 100644 --- a/backend/src/test/notification/test_base.py +++ b/backend/src/test/notification/test_base.py @@ -6,6 +6,7 @@ DEFAULT_LOG_TEMPLATE, DEFAULT_MESSAGE_TEMPLATE, NotifierAdapter, + NotifierRequestMixin, ) @@ -45,3 +46,29 @@ def test_NotifierAdapter_non_implementation_raised(fake_notification): NotifierAdapter.send(fake_notification) assert exc.match("send method is not implemented yet.") + + +@pytest.mark.asyncio +async def test_NotifierRequestMixin_asend(): + with aioresponses() as m: + m.post( + "https://example.com?foo=bar", + headers={"Content-Type": "application/json"}, + payload={"hello": "world"}, + ) + + await NotifierRequestMixin().asend( + entrypoint="https://example.com", + method="POST", + params={"foo": "bar"}, + data={"hello": "world"}, + headers={"Content-Type": "application/json"}, + ) + + m.assert_called_once_with( + "https://example.com", + method="POST", + params={"foo": "bar"}, + data={"hello": "world"}, + headers={"Content-Type": "application/json"}, + )