diff --git a/Discord/cogs/runescape.py b/Discord/cogs/runescape.py index 8d89f7fd2a..5ffbf7e74b 100644 --- a/Discord/cogs/runescape.py +++ b/Discord/cogs/runescape.py @@ -9,6 +9,7 @@ sys.path.insert(0, "..") from units.runescape import get_ge_data, get_item_id, get_monster_data +from units.wikis import search_wiki sys.path.pop(0) async def setup(bot): @@ -160,9 +161,20 @@ async def wiki(self, ctx, *, query): Search query """ await ctx.defer() - await ctx.bot.cogs["Search"].process_wiki( - ctx, "https://runescape.wiki/api.php", query - ) + try: + article = await search_wiki( + "https://runescape.wiki/api.php", query, + aiohttp_session = ctx.bot.aiohttp_session + ) + except ValueError as e: + await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}") + else: + await ctx.embed_reply( + title = article.title, + title_url = article.url, + description = article.extract, + image_url = article.image_url + ) @runescape.command(hidden = True, with_app_command = False) async def zybez(self, ctx): diff --git a/Discord/cogs/search.py b/Discord/cogs/search.py index efcad43889..c36ee122b8 100644 --- a/Discord/cogs/search.py +++ b/Discord/cogs/search.py @@ -6,6 +6,7 @@ import functools import inspect import re +import sys from typing import Optional from bs4 import BeautifulSoup @@ -15,6 +16,10 @@ from utilities.menu_sources import WolframAlphaSource from utilities.paginators import ButtonPaginator +sys.path.insert(0, "..") +from units.wikis import search_wiki +sys.path.pop(0) + async def setup(bot): await bot.add_cog(Search(bot)) @@ -285,18 +290,41 @@ async def process_uesp(self, ctx, search, random = False, redirect = True): ) async def wikipedia(self, ctx, *, query: str): """Search for an article on Wikipedia""" - await self.process_wiki( - ctx, "https://en.wikipedia.org/w/api.php", query - ) + try: + article = await search_wiki( + "https://en.wikipedia.org/w/api.php", query, + aiohttp_session = ctx.bot.aiohttp_session + ) + except ValueError as e: + await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}") + else: + await ctx.embed_reply( + title = article.title, + title_url = article.url, + description = article.extract, + image_url = article.image_url + ) @wikipedia.command(name = "random") async def wikipedia_random(self, ctx): """Random Wikipedia article""" # Note: random wikipedia command invokes this command await ctx.defer() - await self.process_wiki( - ctx, "https://en.wikipedia.org/w/api.php", None, random = True - ) + try: + article = await search_wiki( + "https://en.wikipedia.org/w/api.php", None, + aiohttp_session = ctx.bot.aiohttp_session, + random = True + ) + except ValueError as e: + await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}") + else: + await ctx.embed_reply( + title = article.title, + title_url = article.url, + description = article.extract, + image_url = article.image_url + ) @app_commands.command(name = "wikipedia") async def slash_wikipedia(self, interaction, *, query: str): @@ -322,122 +350,37 @@ async def fandom(self, ctx): @fandom.command(aliases = ["lord_of_the_rings"]) async def lotr(self, ctx, *, query: str): """Search for an article on The Lord of The Rings Wiki""" - await self.process_wiki(ctx, "https://lotr.fandom.com/api.php", query) + try: + article = await search_wiki( + "https://lotr.fandom.com/api.php", query, + aiohttp_session = ctx.bot.aiohttp_session + ) + except ValueError as e: + await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}") + else: + await ctx.embed_reply( + title = article.title, + title_url = article.url, + description = article.extract, + image_url = article.image_url + ) @commands.command() async def tolkien(self, ctx, *, query: str): """Search for an article on Tolkien Gateway""" - await self.process_wiki( - ctx, "https://tolkiengateway.net/w/api.php", query - ) - - async def process_wiki( - self, ctx, url, search, random = False, redirect = True - ): - # TODO: Add User-Agent - # TODO: Use textwrap - if random: - async with ctx.bot.aiohttp_session.get( - url, params = { - "action": "query", "list": "random", "rnnamespace": 0, - "format": "json" - } - ) as resp: - data = await resp.json() - - search = data["query"]["random"][0]["title"] - else: - async with ctx.bot.aiohttp_session.get( - url, params = { - "action": "query", "list": "search", "srsearch": search, - "srinfo": "suggestion", "srlimit": 1, "format": "json" - } - ) as resp: - data = await resp.json() - - if search := data["query"]["search"]: - search = search[0]["title"] - elif not ( - search := data["query"].get("searchinfo", {}).get("suggestion") - ): - await ctx.embed_reply(f"{ctx.bot.error_emoji} Page not found") - return - - async with ctx.bot.aiohttp_session.get( - url, params = { - "action": "query", "redirects": "", - "prop": "info|extracts|pageimages", "titles": search, - "inprop": "url", "exintro": "", "explaintext": "", - "pithumbsize": 9000, "pilicense": "any", "format": "json" - } # TODO: Use exchars? - ) as resp: - data = await resp.json() - - if "pages" not in data["query"]: - await ctx.embed_reply(f"{ctx.bot.error_emoji} Error") - return - - page_id = list(data["query"]["pages"].keys())[0] - page = data["query"]["pages"][page_id] - - if "missing" in page: - await ctx.embed_reply(f"{ctx.bot.error_emoji} Page not found") - elif "invalid" in page: - await ctx.embed_reply( - f"{ctx.bot.error_emoji} Error: {page['invalidreason']}" - ) - elif redirect and "redirects" in data["query"]: - await self.process_wiki( - ctx, url, data["query"]["redirects"][-1]["to"], - redirect = False + try: + article = await search_wiki( + "https://tolkiengateway.net/w/api.php", query, + aiohttp_session = ctx.bot.aiohttp_session ) - # TODO: Handle section links/tofragments + except ValueError as e: + await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}") else: - thumbnail = data["query"]["pages"][page_id].get("thumbnail") - - if "extract" not in page: - async with ctx.bot.aiohttp_session.get( - url, params = { - "action": "parse", "page": search, "prop": "text", - "format": "json" - } - ) as resp: - data = await resp.json() - - p = BeautifulSoup( - data["parse"]["text"]['*'], "lxml" - ).body.div.find_all('p', recursive = False) - - first_p = p[0] - if first_p.aside: - first_p.aside.clear() - description = first_p.get_text() - - if len(p) > 1: - second_p = p[1] - description += '\n' + second_p.get_text() - - description = re.sub( - r"\n\s*\n", "\n\n", - description if len(description) <= 512 - else description[:512] + "..." - ) - else: - description = re.sub( - r"\s+ \s+", ' ', - page["extract"] if len(page["extract"]) <= 512 - else page["extract"][:512] + "..." - ) - await ctx.embed_reply( - title = page["title"], - title_url = page["fullurl"], # TODO: Use canonicalurl? - description = description, - image_url = ( - thumbnail["source"].replace( - f"{thumbnail['width']}px", "1200px" - ) if thumbnail else None - ) + title = article.title, + title_url = article.url, + description = article.extract, + image_url = article.image_url ) @commands.group( diff --git a/units/wikis.py b/units/wikis.py new file mode 100644 index 0000000000..3181dd0145 --- /dev/null +++ b/units/wikis.py @@ -0,0 +1,132 @@ + +import re + +import aiohttp +from bs4 import BeautifulSoup +from pydantic import BaseModel + + +class WikiArticle(BaseModel): + title: str + url: str + extract: str + image_url: str | None + + +async def search_wiki( + url: str, + search: str, + *, + aiohttp_session: aiohttp.ClientSession | None = None, + random: bool = False, + redirect: bool = True +) -> WikiArticle: + # TODO: Add User-Agent + # TODO: Use textwrap + if aiohttp_session_not_passed := (aiohttp_session is None): + aiohttp_session = aiohttp.ClientSession() + try: + if random: + async with aiohttp_session.get( + url, params = { + "action": "query", "list": "random", "rnnamespace": 0, + "format": "json" + } + ) as resp: + data = await resp.json() + + search = data["query"]["random"][0]["title"] + else: + async with aiohttp_session.get( + url, params = { + "action": "query", "list": "search", "srsearch": search, + "srinfo": "suggestion", "srlimit": 1, "format": "json" + } + ) as resp: + data = await resp.json() + + if search := data["query"]["search"]: + search = search[0]["title"] + elif not ( + search := data["query"].get("searchinfo", {}).get("suggestion") + ): + raise ValueError("Page not found") + + async with aiohttp_session.get( + url, params = { + "action": "query", "redirects": "", + "prop": "info|extracts|pageimages", "titles": search, + "inprop": "url", "exintro": "", "explaintext": "", + "pithumbsize": 9000, "pilicense": "any", "format": "json" + } # TODO: Use exchars? + ) as resp: + data = await resp.json() + + if "pages" not in data["query"]: + raise ValueError("Error") # TODO: More descriptive error + + page_id = list(data["query"]["pages"].keys())[0] + page = data["query"]["pages"][page_id] + + if "missing" in page: + raise ValueError("Page not found") + elif "invalid" in page: + raise ValueError(page["invalidreason"]) + elif redirect and "redirects" in data["query"]: + await search_wiki( + url, data["query"]["redirects"][-1]["to"], + aiohttp_session = aiohttp_session, + redirect = False + ) + # TODO: Handle section links/tofragments + else: + thumbnail = data["query"]["pages"][page_id].get("thumbnail") + + if "extract" not in page: + async with aiohttp_session.get( + url, params = { + "action": "parse", "page": search, "prop": "text", + "format": "json" + } + ) as resp: + data = await resp.json() + + p = BeautifulSoup( + data["parse"]["text"]['*'], "lxml" + ).body.div.find_all('p', recursive = False) + + first_p = p[0] + if first_p.aside: + first_p.aside.clear() + extract = first_p.get_text() + + if len(p) > 1: + second_p = p[1] + extract += '\n' + second_p.get_text() + + extract = re.sub( + r"\n\s*\n", "\n\n", + extract if len(extract) <= 512 + else extract[:512] + "..." + ) + else: + extract = re.sub( + r"\s+ \s+", ' ', + page["extract"] if len(page["extract"]) <= 512 + else page["extract"][:512] + "..." + ) + + return WikiArticle( + title = page["title"], + url = page["fullurl"], # TODO: Use canonicalurl? + extract = extract, + image_url = ( + thumbnail["source"].replace( + f"{thumbnail['width']}px", "1200px" + ) if thumbnail else None + ) + ) + finally: + if aiohttp_session_not_passed: + await aiohttp_session.close() +