diff --git a/src/bot.py b/src/bot.py index b5a32c8..81ac552 100644 --- a/src/bot.py +++ b/src/bot.py @@ -2,17 +2,20 @@ import json import asyncio import logging +from typing import List import discord from discord import app_commands from discord.ext import tasks from src.rss import check_posts +from src.teams import TeamsData from src.constants import ( SPECIAL_ROLE, VERIFIED_ROLE, CHANNEL_PREFIX, VOLUNTEER_ROLE, + TEAM_LEADER_ROLE, FEED_CHANNEL_NAME, FEED_CHECK_INTERVAL, ANNOUNCE_CHANNEL_NAME, @@ -28,6 +31,14 @@ create_voice, create_team_channel, ) +from src.commands.stats import ( + Stats, + post_stats, + stats_subscribe, + SubscribedMessage, + SUBSCRIBE_MSG_FILE, + load_subscribed_messages, +) from src.commands.passwd import passwd @@ -37,10 +48,13 @@ class BotClient(discord.Client): verified_role: discord.Role special_role: discord.Role volunteer_role: discord.Role + supervisor_role: discord.Role welcome_category: discord.CategoryChannel announce_channel: discord.TextChannel passwords: dict[str, str] feed_channel: discord.TextChannel + teams_data: TeamsData = TeamsData([]) + subscribed_messages: List[SubscribedMessage] def __init__( self, @@ -64,10 +78,15 @@ def __init__( team.add_command(create_team_channel) team.add_command(export_team) self.tree.add_command(team, guild=self.guild) + stats = Stats() + stats.add_command(post_stats) + stats.add_command(stats_subscribe) self.tree.add_command(passwd, guild=self.guild) + self.tree.add_command(stats, guild=self.guild) self.tree.add_command(join, guild=self.guild) self.tree.add_command(logs, guild=self.guild) self.load_passwords() + load_subscribed_messages(self) async def setup_hook(self) -> None: # This copies the global commands over to your guild. @@ -86,6 +105,7 @@ async def on_ready(self) -> None: verified_role = discord.utils.get(guild.roles, name=VERIFIED_ROLE) special_role = discord.utils.get(guild.roles, name=SPECIAL_ROLE) volunteer_role = discord.utils.get(guild.roles, name=VOLUNTEER_ROLE) + supervisor_role = discord.utils.get(guild.roles, name=TEAM_LEADER_ROLE) welcome_category = discord.utils.get(guild.categories, name=WELCOME_CATEGORY_NAME) announce_channel = discord.utils.get(guild.text_channels, name=ANNOUNCE_CHANNEL_NAME) feed_channel = discord.utils.get(guild.text_channels, name=FEED_CHANNEL_NAME) @@ -94,6 +114,7 @@ async def on_ready(self) -> None: verified_role is None or special_role is None or volunteer_role is None + or supervisor_role is None or welcome_category is None or announce_channel is None or feed_channel is None @@ -104,10 +125,14 @@ async def on_ready(self) -> None: self.verified_role = verified_role self.special_role = special_role self.volunteer_role = volunteer_role + self.supervisor_role = supervisor_role self.welcome_category = welcome_category self.announce_channel = announce_channel self.feed_channel = feed_channel + self.teams_data.gen_team_memberships(self.guild, self.supervisor_role) + await self.update_subscribed_messages() + async def on_member_join(self, member: discord.Member) -> None: name = member.display_name self.logger.info(f"Member {name} joined") @@ -136,12 +161,49 @@ async def on_member_join(self, member: discord.Member) -> None: async def on_member_remove(self, member: discord.Member) -> None: name = member.display_name self.logger.info(f"Member '{name}' left") + + if self.verified_role in member.roles: + return + for channel in self.welcome_category.channels: # If the only user able to see it is the bot, then delete it. if channel.overwrites.keys() == {member.guild.default_role, member.guild.me}: await channel.delete() self.logger.info(f"Deleted channel '{channel.name}', because it has no users.") + async def on_member_update(self, before: discord.Member, after: discord.Member) -> None: + """Update subscribed messages when a member's roles change.""" + if isinstance(self.guild, discord.Guild): + self.teams_data.gen_team_memberships(self.guild, self.supervisor_role) + + await self.update_subscribed_messages() + + async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent) -> None: + """Remove subscribed messages by reacting with a cross mark.""" + if payload.emoji.name != '\N{CROSS MARK}': + return + if SubscribedMessage(payload.channel_id, payload.message_id) not in self.subscribed_messages: + # Ignore for messages not in the subscribed list + return + if payload.member is None: + # Ignore for users not in the server + return + if self.volunteer_role not in payload.member.roles: + # Ignore for users without admin privileges + return + + await self.remove_subscribed_message( + SubscribedMessage(payload.channel_id, payload.message_id), + ) + + def _save_subscribed_messages(self) -> None: + """Save subscribed messages to file.""" + with open(SUBSCRIBE_MSG_FILE, 'w') as f: + json.dump( + [x._asdict() for x in self.subscribed_messages], + f, + ) + @tasks.loop(seconds=FEED_CHECK_INTERVAL) async def check_for_new_blog_posts(self) -> None: self.logger.info("Checking for new blog posts") @@ -177,3 +239,54 @@ def remove_password(self, tla: str) -> None: del self.passwords[tla.upper()] with open('passwords.json', 'w') as f: json.dump(self.passwords, f) + + def stats_message(self, members: bool = True, warnings: bool = True, statistics: bool = False) -> str: + """Generate a message string for the given options.""" + return '\n\n'.join([ + *([self.teams_data.team_summary()] if members else []), + *([self.teams_data.warnings()] if warnings else []), + *([self.teams_data.statistics()] if statistics else []), + ]) + + def add_subscribed_message(self, msg: SubscribedMessage) -> None: + """Add a subscribed message to the subscribed list.""" + self.subscribed_messages.append(msg) + self._save_subscribed_messages() + + async def remove_subscribed_message(self, msg: SubscribedMessage) -> None: + """Remove a subscribed message from the channel and subscribed list.""" + msg_channel = await self.fetch_channel(msg.channel_id) + if not hasattr(msg_channel, 'fetch_message'): + # ignore for channels that don't support message editing + return + message = await msg_channel.fetch_message(msg.message_id) + + if message: # message may have already been deleted manually + chan_name = message.channel.name if hasattr(message.channel, 'name') else 'unknown channel' + print(f'Removing message in {chan_name} from {message.author.name}') + await message.delete() # remove message from discord + + # remove message from subscription list and save to file + self.subscribed_messages.remove(msg) + self._save_subscribed_messages() + + async def update_subscribed_messages(self) -> None: + """Update all subscribed messages.""" + print('Updating subscribed messages') + for sub_msg in self.subscribed_messages: # edit all subscribed messages + message = self.stats_message( + sub_msg.members, + sub_msg.warnings, + sub_msg.stats, + ) + message = f"```\n{message}\n```" + + try: + msg_channel = await self.fetch_channel(sub_msg.channel_id) + if not hasattr(msg_channel, 'fetch_message'): + # ignore for channels that don't support message editing + continue + msg = await msg_channel.fetch_message(sub_msg.message_id) + await msg.edit(content=message) + except AttributeError: # message is no longer available + await self.remove_subscribed_message(sub_msg) diff --git a/src/commands/join.py b/src/commands/join.py index 9c8c30c..ea9db35 100644 --- a/src/commands/join.py +++ b/src/commands/join.py @@ -79,6 +79,13 @@ async def join(interaction: discord.Interaction["BotClient"], password: str) -> def find_team(client: "BotClient", member: discord.Member, entered: str) -> str | None: + entered = (entered.lower() + .replace(" ", "-") + .replace("_", "-") + # German layout typos: + .replace("/", "-") + .replace("ß", "-")) + for team_name, password in client.passwords.items(): if entered == password: client.logger.info( diff --git a/src/commands/stats.py b/src/commands/stats.py new file mode 100644 index 0000000..78d1ce4 --- /dev/null +++ b/src/commands/stats.py @@ -0,0 +1,124 @@ +# file to store messages being dynamically updated between reboots +import json +from typing import Any, Dict, NamedTuple, TYPE_CHECKING + +import discord +from discord import app_commands + +from src.constants import VOLUNTEER_ROLE + +if TYPE_CHECKING: + from src.bot import BotClient + +SUBSCRIBE_MSG_FILE = 'subscribed_messages.json' + + +class SubscribedMessage(NamedTuple): + """A message that is updated when the server statistics change.""" + + channel_id: int + message_id: int + members: bool = True + warnings: bool = True + stats: bool = False + + @classmethod + def load(cls, dct: Dict[str, Any]) -> 'SubscribedMessage': # type:ignore[misc] + """Load a SubscribedMessage object from a dictionary.""" + return cls(**dct) + + def __eq__(self, comp: object) -> bool: + if not isinstance(comp, SubscribedMessage): + return False + return ( + self.channel_id == comp.channel_id + and self.message_id == comp.message_id + ) + + +@app_commands.guild_only() +@app_commands.default_permissions() +class Stats(app_commands.Group): + pass + + +@app_commands.command(name='post') # type:ignore[arg-type] +@app_commands.describe( + members='Display the number of members in each team', + warnings='Display warnings about missing supervisors and empty teams', + stats='Display statistics about the teams', +) +@app_commands.checks.has_role(VOLUNTEER_ROLE) +async def post_stats( + ctx: discord.interactions.Interaction["BotClient"], + members: bool = False, + warnings: bool = False, + stats: bool = False, +) -> None: + """Generate statistics for the server and send them to the channel.""" + if (members, warnings, stats) == (False, False, False): + members = True + warnings = True + message = ctx.client.stats_message(members, warnings, stats) + + await send_response(ctx, message) + + +@discord.app_commands.command(name='subscribe') # type:ignore[arg-type] +@app_commands.describe( + members='Display the number of members in each team', + warnings='Display warnings about missing supervisors and empty teams', + stats='Display statistics about the teams', +) +@app_commands.checks.has_role(VOLUNTEER_ROLE) +async def stats_subscribe( + ctx: discord.interactions.Interaction["BotClient"], + members: bool = False, + warnings: bool = False, + stats: bool = False, +) -> None: + """Subscribe to updates for statistics for the server and send a subscribed message.""" + if (members, warnings, stats) == (False, False, False): + members = True + warnings = True + message = ctx.client.stats_message(members, warnings, stats) + + bot_message = await send_response(ctx, message) + if bot_message is None: + return + ctx.client.add_subscribed_message(SubscribedMessage( + bot_message.channel.id, + bot_message.id, + members, + warnings, + stats, + )) + + +async def send_response( + ctx: discord.interactions.Interaction['BotClient'], + message: str, +) -> discord.Message | None: + """Respond to an interaction and return the bot's message object.""" + try: + await ctx.response.send_message(f"```\n{message}\n```") + bot_message = await ctx.original_response() + except discord.NotFound as e: + print('Unable to find original message') + print(e) + except (discord.HTTPException, discord.ClientException) as e: + print('Unable to connect to discord server') + print(e) + else: + return bot_message + return None + + +def load_subscribed_messages(client: 'BotClient') -> None: + """Load subscribed message details from file.""" + try: + with open(SUBSCRIBE_MSG_FILE) as f: + client.subscribed_messages = json.load(f, object_hook=SubscribedMessage.load) + except (json.JSONDecodeError, FileNotFoundError): + with open(SUBSCRIBE_MSG_FILE, 'w') as f: + f.write('[]') diff --git a/src/teams.py b/src/teams.py new file mode 100644 index 0000000..bcba82b --- /dev/null +++ b/src/teams.py @@ -0,0 +1,155 @@ +from typing import List, NamedTuple +from statistics import mean +from collections import defaultdict + +import discord + +from src.constants import ROLE_PREFIX + + +class TeamData(NamedTuple): + """Stores the TLA, number of members and presence of a team supervisor for a team.""" + + TLA: str + members: int = 0 + leader: bool = False + + def has_leader(self) -> bool: + """Return whether the team has a leader.""" + return self.leader + + def is_primary(self) -> bool: + """Return whether the team is a primary team.""" + return not self.TLA[-1].isdigit() or self.TLA[-1] == '1' + + def school(self) -> str: + """TLA without the team number.""" + return ''.join(c for c in self.TLA if c.isalpha()) + + def __str__(self) -> str: + data_str = f'{self.TLA:<15} {self.members:>2}' + if self.leader is False: + data_str += ' No supervisor' + return data_str + + +class TeamsData(NamedTuple): + """A container for a list of TeamData objects.""" + + teams_data: List[TeamData] + + def gen_team_memberships(self, guild: discord.Guild, leader_role: discord.Role) -> None: + """Generate a list of TeamData objects for the given guild, stored in teams_data.""" + teams_data = [] + + for role in filter(lambda role: role.name.startswith(ROLE_PREFIX), guild.roles): + team_data = TeamData( + TLA=role.name[len(ROLE_PREFIX):], + members=len(list(filter( + lambda member: leader_role not in member.roles, + role.members, + ))), + leader=len(list(filter( + lambda member: leader_role in member.roles, + role.members, + ))) > 0, + ) + + teams_data.append(team_data) + + teams_data.sort(key=lambda team: team.TLA) # sort by TLA + self.teams_data.clear() + self.teams_data.extend(teams_data) + + @property + def empty_tlas(self) -> List[str]: + """A list of TLAs for teams with no members or supervisors.""" + return [ + team.TLA + for team in self.teams_data + if not team.leader and team.members == 0 + ] + + @property + def missing_leaders(self) -> List[str]: + """A list of TLAs for teams with no supervisors but at least one member.""" + return [ + team.TLA + for team in self.teams_data + if not team.leader and team.members > 0 + ] + + @property + def leader_only(self) -> List[str]: + """A list of TLAs for teams with only supervisors and no members.""" + return [ + team.TLA + for team in self.teams_data + if team.leader and team.members == 0 + ] + + @property + def empty_primary_teams(self) -> List[str]: + """A list of TLAs for primary teams with no members.""" + return [ + team.TLA + for team in self.teams_data + if team.is_primary() and team.TLA in self.empty_tlas + ] + + @property + def primary_leader_only(self) -> List[str]: + """A list of TLAs for primary teams with only supervisors.""" + return [ + team.TLA + for team in self.teams_data + if team.is_primary() and team.TLA in self.leader_only + ] + + def team_summary(self) -> str: + """A summary of the teams.""" + return '\n'.join([ + 'Members per team', + *( + str(team) + for team in self.teams_data + ), + ]) + + def warnings(self) -> str: + """A list of warnings for the teams.""" + return '\n'.join([ + f'Empty teams: {len(self.empty_tlas)}', + f'Teams without supervisors: {len(self.missing_leaders)}', + f'Teams with only supervisors: {len(self.leader_only)}', + '', + f'Empty primary teams: {len(self.empty_primary_teams)}', + f'Primary teams with only supervisors: {len(self.primary_leader_only)}', + ]) + + def statistics(self) -> str: + """A list of statistics for the teams.""" + num_teams: int = len(self.teams_data) + member_counts = [team.members for team in self.teams_data] + num_members = sum(member_counts) + num_schools = len([team for team in self.teams_data if team.is_primary()]) + + min_team = min(self.teams_data, key=lambda x: x.members) + max_team = max(self.teams_data, key=lambda x: x.members) + + school_members = defaultdict(list) + for team in self.teams_data: + school_members[team.school()].append(team.members) + school_avg = {school: mean(members) for school, members in school_members.items()} + max_avg_school, max_avg_size = max(school_avg.items(), key=lambda x: x[1]) + + return '\n'.join([ + f'Total teams: {num_teams}', + f'Total schools: {num_schools}', + f'Total students: {num_members}', + f'Max team size: {max_team.members} ({max_team.TLA})', + f'Min team size: {min_team.members} ({min_team.TLA})', + f'Average team size: {mean(member_counts):.1f}', + f'Average school members: {num_members / num_schools:.1f}', + f'Max team size, school average: {max_avg_size:.1f} ({max_avg_school})', + ])