From ddf6f5c0b6d8c1f4aea469e04aee459f9274287d Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 25 Aug 2023 12:44:59 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=E7=A7=8D=E5=AD=90=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E6=8B=86=E5=88=86=E4=B8=BA=E7=8B=AC=E7=AB=8B=E7=9A=84=E6=A8=A1?= =?UTF-8?q?=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/subscribe.py | 76 +++------------------------- app/chain/torrents.py | 109 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 68 deletions(-) create mode 100644 app/chain/torrents.py diff --git a/app/chain/subscribe.py b/app/chain/subscribe.py index 441d3bb07..2ecc3c00d 100644 --- a/app/chain/subscribe.py +++ b/app/chain/subscribe.py @@ -8,7 +8,7 @@ from app.chain import ChainBase from app.chain.download import DownloadChain from app.chain.search import SearchChain -from app.core.config import settings +from app.chain.torrents import TorrentsChain from app.core.context import TorrentInfo, Context, MediaInfo from app.core.meta import MetaBase from app.core.metainfo import MetaInfo @@ -16,11 +16,9 @@ from app.db.subscribe_oper import SubscribeOper from app.db.systemconfig_oper import SystemConfigOper from app.helper.message import MessageHelper -from app.helper.sites import SitesHelper from app.log import logger from app.schemas import NotExistMediaInfo, Notification from app.schemas.types import MediaType, SystemConfigKey, MessageChannel, NotificationType -from app.utils.string import StringUtils class SubscribeChain(ChainBase): @@ -28,14 +26,12 @@ class SubscribeChain(ChainBase): 订阅管理处理链 """ - _cache_file = "__torrents_cache__" - def __init__(self, db: Session = None): super().__init__(db) self.downloadchain = DownloadChain(self._db) self.searchchain = SearchChain(self._db) self.subscribeoper = SubscribeOper(self._db) - self.siteshelper = SitesHelper() + self.torrentschain = TorrentsChain() self.message = MessageHelper() self.systemconfig = SystemConfigOper(self._db) @@ -362,73 +358,17 @@ def finish_subscribe_or_not(self, subscribe: Subscribe, meta: MetaInfo, def refresh(self): """ - 刷新站点最新资源 + 刷新订阅 """ - # 所有订阅 + # 查询所有订阅 subscribes = self.subscribeoper.list('R') if not subscribes: # 没有订阅不运行 return - # 读取缓存 - torrents_cache: Dict[str, List[Context]] = self.load_cache(self._cache_file) or {} - - # 所有站点索引 - indexers = self.siteshelper.get_indexers() - # 配置的索引站点 - config_indexers = [str(sid) for sid in self.systemconfig.get(SystemConfigKey.IndexerSites) or []] - # 遍历站点缓存资源 - for indexer in indexers: - # 未开启的站点不搜索 - if config_indexers and str(indexer.get("id")) not in config_indexers: - continue - logger.info(f'开始刷新 {indexer.get("name")} 最新种子 ...') - domain = StringUtils.get_url_domain(indexer.get("domain")) - torrents: List[TorrentInfo] = self.refresh_torrents(site=indexer) - # 按pubdate降序排列 - torrents.sort(key=lambda x: x.pubdate or '', reverse=True) - # 取前N条 - torrents = torrents[:settings.CACHE_CONF.get('refresh')] - if torrents: - # 过滤出没有处理过的种子 - torrents = [torrent for torrent in torrents - if f'{torrent.title}{torrent.description}' - not in [f'{t.torrent_info.title}{t.torrent_info.description}' - for t in torrents_cache.get(domain) or []]] - if torrents: - logger.info(f'{indexer.get("name")} 有 {len(torrents)} 个新种子') - else: - logger.info(f'{indexer.get("name")} 没有新种子') - continue - for torrent in torrents: - logger.info(f'处理资源:{torrent.title} ...') - # 识别 - meta = MetaInfo(title=torrent.title, subtitle=torrent.description) - # 识别媒体信息 - mediainfo: MediaInfo = self.recognize_media(meta=meta) - if not mediainfo: - logger.warn(f'未识别到媒体信息,标题:{torrent.title}') - # 存储空的媒体信息 - mediainfo = MediaInfo() - # 清理多余数据 - mediainfo.clear() - # 上下文 - context = Context(meta_info=meta, media_info=mediainfo, torrent_info=torrent) - # 添加到缓存 - if not torrents_cache.get(domain): - torrents_cache[domain] = [context] - else: - torrents_cache[domain].append(context) - # 如果超过了限制条数则移除掉前面的 - if len(torrents_cache[domain]) > settings.CACHE_CONF.get('torrents'): - torrents_cache[domain] = torrents_cache[domain][-settings.CACHE_CONF.get('torrents'):] - # 回收资源 - del torrents - else: - logger.info(f'{indexer.get("name")} 没有获取到种子') - # 从缓存中匹配订阅 - self.match(torrents_cache) - # 保存缓存到本地 - self.save_cache(torrents_cache, self._cache_file) + # 刷新站点资源,从缓存中匹配订阅 + self.match( + self.torrentschain.refresh() + ) def match(self, torrents: Dict[str, List[Context]]): """ diff --git a/app/chain/torrents.py b/app/chain/torrents.py new file mode 100644 index 000000000..63177baa6 --- /dev/null +++ b/app/chain/torrents.py @@ -0,0 +1,109 @@ +from typing import Dict, List, Union + +from requests import Session + +from app.chain import ChainBase +from app.core.config import settings +from app.core.context import TorrentInfo, Context, MediaInfo +from app.core.metainfo import MetaInfo +from app.db.systemconfig_oper import SystemConfigOper +from app.helper.sites import SitesHelper +from app.log import logger +from app.schemas import Notification +from app.schemas.types import SystemConfigKey, MessageChannel +from app.utils.string import StringUtils + + +class TorrentsChain(ChainBase): + """ + 种子刷新处理链 + """ + + _cache_file = "__torrents_cache__" + + def __init__(self, db: Session = None): + super().__init__(db) + self.siteshelper = SitesHelper() + self.systemconfig = SystemConfigOper(self._db) + + def remote_refresh(self, channel: MessageChannel, userid: Union[str, int] = None): + """ + 远程刷新订阅,发送消息 + """ + self.post_message(Notification(channel=channel, + title=f"开始刷新种子 ...", userid=userid)) + self.refresh() + self.post_message(Notification(channel=channel, + title=f"种子刷新完成!", userid=userid)) + + def get_torrents(self) -> Dict[str, List[Context]]: + """ + 获取当前缓存的种子 + """ + # 读取缓存 + return self.load_cache(self._cache_file) or {} + + def refresh(self) -> Dict[str, List[Context]]: + """ + 刷新站点最新资源 + """ + # 读取缓存 + torrents_cache = self.get_torrents() + + # 所有站点索引 + indexers = self.siteshelper.get_indexers() + # 配置的索引站点 + config_indexers = [str(sid) for sid in self.systemconfig.get(SystemConfigKey.IndexerSites) or []] + # 遍历站点缓存资源 + for indexer in indexers: + # 未开启的站点不搜索 + if config_indexers and str(indexer.get("id")) not in config_indexers: + continue + logger.info(f'开始刷新 {indexer.get("name")} 最新种子 ...') + domain = StringUtils.get_url_domain(indexer.get("domain")) + torrents: List[TorrentInfo] = self.refresh_torrents(site=indexer) + # 按pubdate降序排列 + torrents.sort(key=lambda x: x.pubdate or '', reverse=True) + # 取前N条 + torrents = torrents[:settings.CACHE_CONF.get('refresh')] + if torrents: + # 过滤出没有处理过的种子 + torrents = [torrent for torrent in torrents + if f'{torrent.title}{torrent.description}' + not in [f'{t.torrent_info.title}{t.torrent_info.description}' + for t in torrents_cache.get(domain) or []]] + if torrents: + logger.info(f'{indexer.get("name")} 有 {len(torrents)} 个新种子') + else: + logger.info(f'{indexer.get("name")} 没有新种子') + continue + for torrent in torrents: + logger.info(f'处理资源:{torrent.title} ...') + # 识别 + meta = MetaInfo(title=torrent.title, subtitle=torrent.description) + # 识别媒体信息 + mediainfo: MediaInfo = self.recognize_media(meta=meta) + if not mediainfo: + logger.warn(f'未识别到媒体信息,标题:{torrent.title}') + # 存储空的媒体信息 + mediainfo = MediaInfo() + # 清理多余数据 + mediainfo.clear() + # 上下文 + context = Context(meta_info=meta, media_info=mediainfo, torrent_info=torrent) + # 添加到缓存 + if not torrents_cache.get(domain): + torrents_cache[domain] = [context] + else: + torrents_cache[domain].append(context) + # 如果超过了限制条数则移除掉前面的 + if len(torrents_cache[domain]) > settings.CACHE_CONF.get('torrents'): + torrents_cache[domain] = torrents_cache[domain][-settings.CACHE_CONF.get('torrents'):] + # 回收资源 + del torrents + else: + logger.info(f'{indexer.get("name")} 没有获取到种子') + # 保存缓存到本地 + self.save_cache(torrents_cache, self._cache_file) + # 返回 + return torrents_cache