-
-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathcog.py
125 lines (108 loc) · 5.48 KB
/
cog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
Cog implementing subscriptions to forum posts based on their tags
"""
import disnake
from disnake.ext import commands
import utils
from buttons.general import TrashView
from cogs.base import Base
from database.subscription import AlreadyNotifiedDB, SubscriptionDB
from rubbergod import Rubbergod
from utils import cooldowns
from .messages_cz import MessagesCZ
async def autocomp_available_tags(inter: disnake.ApplicationCommandInteraction, user_input: str):
if "channel" not in inter.filled_options:
return []
channel_id = inter.filled_options["channel"]
channel: disnake.ForumChannel = await inter.bot.fetch_channel(channel_id)
return [tag.name for tag in channel.available_tags if user_input in tag.name][:25]
async def autocomp_subscribed_tags(inter: disnake.ApplicationCommandInteraction, user_input: str):
channel_id = str(inter.filled_options["channel"])
tags = SubscriptionDB.get_tags(str(inter.author.id), channel_id)
return [tag for tag in tags if user_input in tag][:25]
class Subscriptions(Base, commands.Cog):
def __init__(self, bot: Rubbergod):
super().__init__()
self.bot = bot
@cooldowns.short_cooldown
@commands.slash_command(name="subscription")
async def subscription(self, inter: disnake.ApplicationCommandInteraction):
"""Group of commands for forum subscriptions."""
await inter.response.defer(ephemeral=True)
@subscription.sub_command(name="add", description=MessagesCZ.add_brief)
async def add(
self,
inter: disnake.ApplicationCommandInteraction,
channel: disnake.ForumChannel,
tag: str = commands.Param(autocomplete=autocomp_available_tags),
):
available_tags = [tag.name for tag in channel.available_tags]
if tag not in available_tags:
await inter.edit_original_message(MessagesCZ.tag_not_found(channel=channel.mention, tag=tag))
return
if SubscriptionDB.get(str(inter.author.id), str(channel.id), tag):
await inter.edit_original_message(MessagesCZ.already_subscribed)
return
SubscriptionDB.add(inter.author.id, channel.id, tag)
await inter.edit_original_response(MessagesCZ.subscription_added(channel=channel.mention, tag=tag))
@subscription.sub_command(name="remove", description=MessagesCZ.remove_brief)
async def remove(
self,
inter: disnake.ApplicationCommandInteraction,
channel: disnake.ForumChannel,
tag: str = commands.Param(autocomplete=autocomp_subscribed_tags),
):
sub = SubscriptionDB.get(str(inter.author.id), str(channel.id), tag)
if not sub:
await inter.edit_original_message(MessagesCZ.subscription_not_found(tag=tag))
return
sub.remove()
await inter.edit_original_message(MessagesCZ.subscription_removed(channel=channel.mention, tag=tag))
@subscription.sub_command(name="list", description=MessagesCZ.list_brief)
async def list(self, inter: disnake.ApplicationCommandInteraction):
subs = SubscriptionDB.get_user(str(inter.author.id))
sub_list = [f"> <#{sub.forum_id}> - {sub.tag}" for sub in subs]
message = f"{MessagesCZ.list_title}\n" + "\n".join(sub_list)
await inter.edit_original_response(message)
@commands.Cog.listener()
async def on_thread_create(self, thread: disnake.Thread):
if not thread.applied_tags:
# thread without tags
return
tags = [tag.name for tag in thread.applied_tags]
subs = SubscriptionDB.get_channel(str(thread.parent_id))
already_notified = AlreadyNotifiedDB.get(str(thread.id))
for sub in subs:
if sub.tag in tags and sub.user_id not in already_notified:
await self.send_notification(sub.user_id, thread)
already_notified.append(sub.user_id)
AlreadyNotifiedDB.add(sub.user_id, str(thread.id))
@commands.Cog.listener()
async def on_thread_update(self, before: disnake.Thread, after: disnake.Thread):
if not after.applied_tags:
# thread without tags
return
tags = [tag.name for tag in filter(lambda x: x not in before.applied_tags, after.applied_tags)]
subs = SubscriptionDB.get_channel(str(after.parent_id))
already_notified = AlreadyNotifiedDB.get(str(after.id))
for sub in subs:
if sub.tag in tags and sub.user_id not in already_notified:
await self.send_notification(sub.user_id, after)
already_notified.append(sub.user_id)
AlreadyNotifiedDB.add(sub.user_id, str(after.id))
async def send_notification(self, user_id: str, thread: disnake.Thread):
user: disnake.User = await self.bot.get_or_fetch_user(user_id)
# get content of first message if available
first_message = await thread.history(limit=1, oldest_first=True).flatten()
content = first_message[0].content if first_message[0] else None
embed = disnake.Embed(
title=MessagesCZ.embed_title,
url=thread.jump_url,
description=content,
)
embed.add_field(name=MessagesCZ.embed_author, value=thread.owner.display_name)
embed.add_field(name=MessagesCZ.embed_channel, value=thread.mention)
tags = [f"`{tag.name}`" for tag in thread.applied_tags]
embed.add_field(name=MessagesCZ.embed_tags, value=", ".join(tags))
utils.embed.add_author_footer(embed, thread.owner)
await user.send(embed=embed, view=TrashView())