Skip to content

Commit

Permalink
[Units] Add and use search_wiki function
Browse files Browse the repository at this point in the history
[Discord] Replace process_wiki method in Search cog
  • Loading branch information
Harmon758 committed Jul 25, 2023
1 parent 472ba7c commit eb74c63
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 118 deletions.
18 changes: 15 additions & 3 deletions Discord/cogs/runescape.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

sys.path.insert(0, "..")
from units.runescape import get_ge_data, get_item_id, get_monster_data
from units.wikis import search_wiki
sys.path.pop(0)

async def setup(bot):
Expand Down Expand Up @@ -160,9 +161,20 @@ async def wiki(self, ctx, *, query):
Search query
"""
await ctx.defer()
await ctx.bot.cogs["Search"].process_wiki(
ctx, "https://runescape.wiki/api.php", query
)
try:
article = await search_wiki(
"https://runescape.wiki/api.php", query,
aiohttp_session = ctx.bot.aiohttp_session
)
except ValueError as e:
await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}")
else:
await ctx.embed_reply(
title = article.title,
title_url = article.url,
description = article.extract,
image_url = article.image_url
)

@runescape.command(hidden = True, with_app_command = False)
async def zybez(self, ctx):
Expand Down
173 changes: 58 additions & 115 deletions Discord/cogs/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import functools
import inspect
import re
import sys
from typing import Optional

from bs4 import BeautifulSoup
Expand All @@ -15,6 +16,10 @@
from utilities.menu_sources import WolframAlphaSource
from utilities.paginators import ButtonPaginator

sys.path.insert(0, "..")
from units.wikis import search_wiki
sys.path.pop(0)

async def setup(bot):
await bot.add_cog(Search(bot))

Expand Down Expand Up @@ -285,18 +290,41 @@ async def process_uesp(self, ctx, search, random = False, redirect = True):
)
async def wikipedia(self, ctx, *, query: str):
"""Search for an article on Wikipedia"""
await self.process_wiki(
ctx, "https://en.wikipedia.org/w/api.php", query
)
try:
article = await search_wiki(
"https://en.wikipedia.org/w/api.php", query,
aiohttp_session = ctx.bot.aiohttp_session
)
except ValueError as e:
await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}")
else:
await ctx.embed_reply(
title = article.title,
title_url = article.url,
description = article.extract,
image_url = article.image_url
)

@wikipedia.command(name = "random")
async def wikipedia_random(self, ctx):
"""Random Wikipedia article"""
# Note: random wikipedia command invokes this command
await ctx.defer()
await self.process_wiki(
ctx, "https://en.wikipedia.org/w/api.php", None, random = True
)
try:
article = await search_wiki(
"https://en.wikipedia.org/w/api.php", None,
aiohttp_session = ctx.bot.aiohttp_session,
random = True
)
except ValueError as e:
await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}")
else:
await ctx.embed_reply(
title = article.title,
title_url = article.url,
description = article.extract,
image_url = article.image_url
)

@app_commands.command(name = "wikipedia")
async def slash_wikipedia(self, interaction, *, query: str):
Expand All @@ -322,122 +350,37 @@ async def fandom(self, ctx):
@fandom.command(aliases = ["lord_of_the_rings"])
async def lotr(self, ctx, *, query: str):
"""Search for an article on The Lord of The Rings Wiki"""
await self.process_wiki(ctx, "https://lotr.fandom.com/api.php", query)
try:
article = await search_wiki(
"https://lotr.fandom.com/api.php", query,
aiohttp_session = ctx.bot.aiohttp_session
)
except ValueError as e:
await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}")
else:
await ctx.embed_reply(
title = article.title,
title_url = article.url,
description = article.extract,
image_url = article.image_url
)

@commands.command()
async def tolkien(self, ctx, *, query: str):
"""Search for an article on Tolkien Gateway"""
await self.process_wiki(
ctx, "https://tolkiengateway.net/w/api.php", query
)

async def process_wiki(
self, ctx, url, search, random = False, redirect = True
):
# TODO: Add User-Agent
# TODO: Use textwrap
if random:
async with ctx.bot.aiohttp_session.get(
url, params = {
"action": "query", "list": "random", "rnnamespace": 0,
"format": "json"
}
) as resp:
data = await resp.json()

search = data["query"]["random"][0]["title"]
else:
async with ctx.bot.aiohttp_session.get(
url, params = {
"action": "query", "list": "search", "srsearch": search,
"srinfo": "suggestion", "srlimit": 1, "format": "json"
}
) as resp:
data = await resp.json()

if search := data["query"]["search"]:
search = search[0]["title"]
elif not (
search := data["query"].get("searchinfo", {}).get("suggestion")
):
await ctx.embed_reply(f"{ctx.bot.error_emoji} Page not found")
return

async with ctx.bot.aiohttp_session.get(
url, params = {
"action": "query", "redirects": "",
"prop": "info|extracts|pageimages", "titles": search,
"inprop": "url", "exintro": "", "explaintext": "",
"pithumbsize": 9000, "pilicense": "any", "format": "json"
} # TODO: Use exchars?
) as resp:
data = await resp.json()

if "pages" not in data["query"]:
await ctx.embed_reply(f"{ctx.bot.error_emoji} Error")
return

page_id = list(data["query"]["pages"].keys())[0]
page = data["query"]["pages"][page_id]

if "missing" in page:
await ctx.embed_reply(f"{ctx.bot.error_emoji} Page not found")
elif "invalid" in page:
await ctx.embed_reply(
f"{ctx.bot.error_emoji} Error: {page['invalidreason']}"
)
elif redirect and "redirects" in data["query"]:
await self.process_wiki(
ctx, url, data["query"]["redirects"][-1]["to"],
redirect = False
try:
article = await search_wiki(
"https://tolkiengateway.net/w/api.php", query,
aiohttp_session = ctx.bot.aiohttp_session
)
# TODO: Handle section links/tofragments
except ValueError as e:
await ctx.embed_reply(f"{ctx.bot.error_emoji} {e}")
else:
thumbnail = data["query"]["pages"][page_id].get("thumbnail")

if "extract" not in page:
async with ctx.bot.aiohttp_session.get(
url, params = {
"action": "parse", "page": search, "prop": "text",
"format": "json"
}
) as resp:
data = await resp.json()

p = BeautifulSoup(
data["parse"]["text"]['*'], "lxml"
).body.div.find_all('p', recursive = False)

first_p = p[0]
if first_p.aside:
first_p.aside.clear()
description = first_p.get_text()

if len(p) > 1:
second_p = p[1]
description += '\n' + second_p.get_text()

description = re.sub(
r"\n\s*\n", "\n\n",
description if len(description) <= 512
else description[:512] + "..."
)
else:
description = re.sub(
r"\s+ \s+", ' ',
page["extract"] if len(page["extract"]) <= 512
else page["extract"][:512] + "..."
)

await ctx.embed_reply(
title = page["title"],
title_url = page["fullurl"], # TODO: Use canonicalurl?
description = description,
image_url = (
thumbnail["source"].replace(
f"{thumbnail['width']}px", "1200px"
) if thumbnail else None
)
title = article.title,
title_url = article.url,
description = article.extract,
image_url = article.image_url
)

@commands.group(
Expand Down
132 changes: 132 additions & 0 deletions units/wikis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@

import re

import aiohttp
from bs4 import BeautifulSoup
from pydantic import BaseModel


class WikiArticle(BaseModel):
title: str
url: str
extract: str
image_url: str | None


async def search_wiki(
url: str,
search: str,
*,
aiohttp_session: aiohttp.ClientSession | None = None,
random: bool = False,
redirect: bool = True
) -> WikiArticle:
# TODO: Add User-Agent
# TODO: Use textwrap
if aiohttp_session_not_passed := (aiohttp_session is None):
aiohttp_session = aiohttp.ClientSession()
try:
if random:
async with aiohttp_session.get(
url, params = {
"action": "query", "list": "random", "rnnamespace": 0,
"format": "json"
}
) as resp:
data = await resp.json()

search = data["query"]["random"][0]["title"]
else:
async with aiohttp_session.get(
url, params = {
"action": "query", "list": "search", "srsearch": search,
"srinfo": "suggestion", "srlimit": 1, "format": "json"
}
) as resp:
data = await resp.json()

if search := data["query"]["search"]:
search = search[0]["title"]
elif not (
search := data["query"].get("searchinfo", {}).get("suggestion")
):
raise ValueError("Page not found")

async with aiohttp_session.get(
url, params = {
"action": "query", "redirects": "",
"prop": "info|extracts|pageimages", "titles": search,
"inprop": "url", "exintro": "", "explaintext": "",
"pithumbsize": 9000, "pilicense": "any", "format": "json"
} # TODO: Use exchars?
) as resp:
data = await resp.json()

if "pages" not in data["query"]:
raise ValueError("Error") # TODO: More descriptive error

page_id = list(data["query"]["pages"].keys())[0]
page = data["query"]["pages"][page_id]

if "missing" in page:
raise ValueError("Page not found")
elif "invalid" in page:
raise ValueError(page["invalidreason"])
elif redirect and "redirects" in data["query"]:
await search_wiki(
url, data["query"]["redirects"][-1]["to"],
aiohttp_session = aiohttp_session,
redirect = False
)
# TODO: Handle section links/tofragments
else:
thumbnail = data["query"]["pages"][page_id].get("thumbnail")

if "extract" not in page:
async with aiohttp_session.get(
url, params = {
"action": "parse", "page": search, "prop": "text",
"format": "json"
}
) as resp:
data = await resp.json()

p = BeautifulSoup(
data["parse"]["text"]['*'], "lxml"
).body.div.find_all('p', recursive = False)

first_p = p[0]
if first_p.aside:
first_p.aside.clear()
extract = first_p.get_text()

if len(p) > 1:
second_p = p[1]
extract += '\n' + second_p.get_text()

extract = re.sub(
r"\n\s*\n", "\n\n",
extract if len(extract) <= 512
else extract[:512] + "..."
)
else:
extract = re.sub(
r"\s+ \s+", ' ',
page["extract"] if len(page["extract"]) <= 512
else page["extract"][:512] + "..."
)

return WikiArticle(
title = page["title"],
url = page["fullurl"], # TODO: Use canonicalurl?
extract = extract,
image_url = (
thumbnail["source"].replace(
f"{thumbnail['width']}px", "1200px"
) if thumbnail else None
)
)
finally:
if aiohttp_session_not_passed:
await aiohttp_session.close()

0 comments on commit eb74c63

Please sign in to comment.