From abc7a29abeb969d458585d7a1fa7b392aee4fc8c Mon Sep 17 00:00:00 2001 From: Henry Hollenstain Date: Sun, 18 Sep 2022 16:04:31 -0700 Subject: [PATCH] Lookup enabled categories to create custom channels --- autochannel/lib/plugins/autochannels.py | 248 +++++++++++++----------- 1 file changed, 138 insertions(+), 110 deletions(-) diff --git a/autochannel/lib/plugins/autochannels.py b/autochannel/lib/plugins/autochannels.py index 14666ac..1305421 100644 --- a/autochannel/lib/plugins/autochannels.py +++ b/autochannel/lib/plugins/autochannels.py @@ -3,6 +3,7 @@ from typing import Optional import asyncio import datetime +from unicodedata import category import discord import logging import queue @@ -102,14 +103,13 @@ async def queue_loop(self): @task_metrics_counter - async def ac_delete_channel(self, cat, force, **kwargs): + async def ac_delete_channel(self, cat: discord.CategoryChannel, force: bool, **kwargs) -> None: """_summary_ Args: - cat (_type_): _description_ - force (_type_): _description_ + cat (discord.CategoryChannel): _description_ + force (bool): _description_ """ - category = self.autochannel.session.query(Category).get(cat.id) db_channel_list_id = category.get_channels() if force: @@ -129,15 +129,15 @@ async def ac_delete_channel(self, cat, force, **kwargs): LOG.debug (f'GUILD: {cat.guild.name} CAT: {cat.name} : No more channels to clean up') @task_metrics_counter - async def ac_create_channel(self, cat, **kwargs): + async def ac_create_channel(self, cat: discord.CategoryChannel, **kwargs) -> Optional[discord.VoiceChannel]: """_summary_ Args: - cat (_type_): _description_ + cat (discord.CategoryChannel): _description_ Returns: - _type_: _description_ - """ + Optional[discord.VoiceChannel]: _description_ + """ db_cat = self.autochannel.session.query(Category).get(cat.id) db_channel_list_id = db_cat.get_channels() auto_channels = [channel for channel in cat.voice_channels if channel.id in db_channel_list_id] @@ -162,16 +162,22 @@ async def ac_create_channel(self, cat, **kwargs): return created_channel @task_metrics_counter - async def vc_delete_channel(self, voicechannel, **kwargs): + async def vc_delete_channel(self, voicechannel: discord.VoiceChannel, **kwargs) -> None: """_summary_ Args: - voicechannel (_type_): _description_ - """ + voicechannel (discord.VoiceChannel): _description_ + """ reason = kwargs.get('reason') await voicechannel.delete(reason=reason) - async def manage_auto_voice_channels(self, autochannel, guild=None): + async def manage_auto_voice_channels(self, autochannel, guild=None) -> None: + """_summary_ + + Args: + autochannel (_type_): _description_ + guild (_type_, optional): _description_. Defaults to None. + """ db_cats_disabled = None if guild: db_cats = list(self.autochannel.session.query(Category).with_entities(Category.id).filter_by(enabled=True, guild_id=guild.id).all()) @@ -292,25 +298,15 @@ async def ac_prefix_sync(self, channel, db_cat, **kwargs): LOG.debug(f'Channel suffix being updated: {channel_db.num_suffix}') await channel.edit(name=f'{db_cat.prefix} - {channel_db.num_suffix}') - def ac_highest_empty_channel(self, empty_auto_channels): - """[summary] - Takes in alist of empty channel objects and returns the channel object - with the highest numbered suffix example channels '1', '5', '6' are in - list but will return the '6' channel as the higest empty channel - - Arguments: - empty_auto_channels {[list of channel objects]} -- list of empty - channel objects - """ - highest_empty_channel = None - for ec in empty_auto_channels: - if not highest_empty_channel: - highest_empty_channel = ec - if self.get_ac_channel(ec) > self.get_ac_channel(highest_empty_channel): - highest_empty_channel = ec - return highest_empty_channel + def ac_db_highest_empty_channel(self, empty_auto_channels: list) -> Optional[discord.VoiceChannel]: + """_summary_ + + Args: + empty_auto_channels (list): _description_ - def ac_db_highest_empty_channel(self, empty_auto_channels): + Returns: + Optional[discord.VoiceChannel]: _description_ + """ highest_empty_channel = None highest_empty_channel_num = None for ec in empty_auto_channels: @@ -333,7 +329,7 @@ async def on_ready(self): @app_commands.checks.has_permissions(manage_channels=True) @app_commands.command(name="sync", description="sync from http://auto-chan.io") @command_metrics_counter - async def sync(self, interaction: discord.Interaction): + async def sync(self, interaction: discord.Interaction) -> None: """_summary_ Args: @@ -344,13 +340,19 @@ async def sync(self, interaction: discord.Interaction): @sync.error - async def on_sync_error(self, interaction: discord.Interaction, error: app_commands.AppCommandError): - await interaction.response.send_message(f"Error {error}") + async def on_sync_error(self, interaction: discord.Interaction, error: app_commands.AppCommandError) -> None: + """_summary_ + Args: + interaction (discord.Interaction): _description_ + error (app_commands.AppCommandError): _description_ + """ + await interaction.response.send_message(f"Error {error}") + @app_commands.describe(category='name of category', channel_name='custom name of channel') @app_commands.command(name="vc", description="create voice channel") @command_metrics_counter - async def vc(self, interaction: discord.Interaction, category: str, channelname: str = ''): + async def vc(self, interaction: discord.Interaction, category: str, channel_name: str = '') -> None: """_summary_ Args: @@ -366,39 +368,62 @@ async def vc(self, interaction: discord.Interaction, category: str, channelname: if not self.validateCategory(interaction, category): raise ACUnknownCategory(f'Unknown Discord category') - categoryName = [cat for cat in interaction.guild.categories if cat.name.lower() in category.lower()][0] - categoryObj = self.autochannel.session.query(Category).get(categoryName.id) + category_name = [cat for cat in interaction.guild.categories if cat.name.lower() in category.lower()][0] + category_obj = self.autochannel.session.query(Category).get(category_name.id) - if channelname: - vcSuffix = channelname - if pf.is_profane(channelname): + """Checks if there is channel name and if there is profanity used""" + if channel_name: + vc_suffix = channel_name + if pf.is_profane(channel_name): raise VCProfaneWordused(f'Used a profane word when creating a custom voice channel') - else: - vcSuffix = self.vc_channel_number(interaction, category, categoryObj) + vc_suffix = self.vc_channel_number(interaction, category, category_obj) + if not category_obj.custom_enabled: + raise ACDisabledCustomCategory(f'Category {category_name.name} is disabled. To use custom channels in this category an **ADMIN** must enable: http://auto-chan.io') - if not categoryObj.custom_enabled: - raise ACDisabledCustomCategory(f'Category {categoryName.name} is disabled. To use custom channels in this category an **ADMIN** must enable: http://auto-chan.io') - - createdVC= await interaction.guild.create_voice_channel(f'{categoryObj.custom_prefix} {vcSuffix}', overwrites={}, category=categoryName, reason='AutoChannel bot automation') - # overwrite = discord.PermissionOverwrite() - # overwrite.manage_channels = True - # overwrite.manage_roles = True - # await created_channel.set_permissions(ctx.message.author, overwrite=overwrite) - inviteLink = await self.createVCInvite(interaction, createdVC) + created_vc= await interaction.guild.create_voice_channel(f'{category_obj.custom_prefix} {vc_suffix}', overwrites={}, category=category_name, reason='AutoChannel bot automation') + invite_link = await self.createVCInvite(interaction, created_vc) - await interaction.channel.send(f'AutoChannel made `{interaction.user}` a channel `{createdVC.name}`') - await interaction.response.send_message(inviteLink) + await interaction.channel.send(f'AutoChannel made `{interaction.user}` a channel `{created_vc.name}`') + await interaction.response.send_message(invite_link) await asyncio.sleep(60) - if len(createdVC.members) < 1: + if len(created_vc.members) < 1: try: - await self.vc_delete_channel(createdVC, reason='No one joined the custom channel after 60 seconds') + await self.vc_delete_channel(created_vc, reason='No one joined the custom channel after 60 seconds') except: """annoying to see this error doesn't add value to the end user""" pass + + @vc.autocomplete('category') + async def vc_autocomplete(self, interaction: discord.Interaction, current: str) -> list[app_commands.Choice[str]]: + """_summary_ + + Args: + interaction (discord.Interaction): _description_ + current (str): _description_ + + Returns: + list[app_commands.Choice[str]]: _description_ + """ + + """good chance this might cause lots of db queries if abused""" + category_objs = self.autochannel.session.query(Category).filter_by(custom_enabled=True, guild_id=interaction.guild.id).all() + + categories = [] + for category in category_objs: + try: + categories.append(interaction.guild.get_channel(category.id).name) + except Exception as e: + LOG.error(e) + + return [ + app_commands.Choice(name=category, value=category) + for category in categories if current.lower() in category.lower() + ] + @vc.error async def on_vc_error(self, interaction: discord.Interaction, error: app_commands.AppCommandError): """_summary_ @@ -418,15 +443,15 @@ async def on_vc_error(self, interaction: discord.Interaction, error: app_command await interaction.response.send_message(msg) - def valid_auto_channel(self, v_state): - """[summary] - - Arguments: - v_state {[type]} -- [description] - + def valid_auto_channel(self, v_state: discord.VoiceState) -> bool: + """_summary_ + + Args: + v_state (discord.VoiceState): _description_ + Returns: - [type] -- [description] - """ + bool: _description_ + """ if( v_state is not None and v_state.channel is not None and @@ -449,12 +474,13 @@ def valid_auto_channel(self, v_state): else: return False - async def after_ac_task(self, after, member=None): - """ - after_ac_task: handles the updates to the new channel the user entered - :param: object self: discord client - :param: object before: after state channel - """ + async def after_ac_task(self, after: discord.VoiceState, member=None) -> None: + """_summary_ + + Args: + after (discord.VoiceState): _description_ + member (_type_, optional): _description_. Defaults to None. + """ cat = after.channel.category category = self.autochannel.session.query(Category).get(cat.id) db_channel_list_id = category.get_channels() @@ -470,12 +496,13 @@ async def after_ac_task(self, after, member=None): await self.queue.put(q_object) - async def before_ac_task(self, before, member=None): - """ - before_ac_task: handles the updates to the old channel the user left - :param: object self: discord client - :param: object before: before state channel - """ + async def before_ac_task(self, before: discord.VoiceState, member=None) -> None: + """handles the updates to the old channel the user left + + Args: + before (discord.VoiceState): before state channel + member (_type_, optional): _description_. Defaults to None. + """ cat = before.channel.category category = self.autochannel.session.query(Category).get(cat.id) db_channel_list_id = category.get_channels() @@ -495,7 +522,7 @@ async def before_ac_task(self, before, member=None): @task_metrics_counter @commands.Cog.listener() - async def on_guild_channel_delete(self, channel): + async def on_guild_channel_delete(self, channel) -> None: """This tracks all channel deletes and if the channel is tracked by auto-chan, it will clean up the db entry. This is imporant due to admins deleting channels managed by auto-chan that otherwise would get missed. @@ -513,17 +540,17 @@ async def on_guild_channel_delete(self, channel): @task_metrics_counter @commands.Cog.listener() - async def on_voice_state_update(self, member, before, after): - """ - Checks before and afer states of channels users leave and join + async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState) -> None: + """Checks before and afer states of channels users leave and join this will handle logic to make sure there is no more than N (current defaulted to 1) empty channel at any time. This will also create a if less than N (current set to 1). - :param: object self: discord client - :param: object member: discord member - :param: object before: before voice channel object - :param: object after: after voice channel object - """ + + Args: + member (discord.Member): discord member + before (discord.VoiceState): before voice channel object + after (discord.VoiceState): after voice channel object + """ if( before.channel is not None and before.channel.category is not None and @@ -540,37 +567,28 @@ async def on_voice_state_update(self, member, before, after): if self.valid_auto_channel(after): await self.after_ac_task(after, member=member) - def ac_channel_number(self, auto_channels): - """ - returns number from auto prefix and returns the lowest number - :param: object self: discord client - :param: objects auto_channels: List of voice_channels objects - :returns the lowest number missing from the sequence of voice_channels - """ - suffix_list = [] - for channel in auto_channels: - suffix_list.append(int(''.join(channel.name.split(' ')[-1:]))) + def ac_db_channel_number(self, db_cat) -> int: + """returns number from auto prefix and returns the lowest number - return utils.missing_numbers(suffix_list)[0] + Args: + db_cat (_type_): List of voice_channels objects - def ac_db_channel_number(self, db_cat): - """ - returns number from auto prefix and returns the lowest number - :param: object self: discord client - :param: objects auto_channels: List of voice_channels objects - :returns the lowest number missing from the sequence of voice_channels - """ + Returns: + int: the lowest number missing from the sequence of voice_channels + """ suffix_list = db_cat.get_chan_suffix() return utils.missing_numbers(suffix_list)[0] - def get_ac_channel(self, auto_channel): - """ - returns the auto generated number suffix of a channel - :param: obejct self: discord client - :param: object auto_channel - :returns the channel suffix number - """ + def get_ac_channel(self, auto_channel) -> int: + """returns the auto generated number suffix of a channel + + Args: + auto_channel (_type_): _description_ + + Returns: + int: _description_ + """ return int(''.join(auto_channel.name.split(' ')[-1:])) def get_ac_db_channel(self, auto_channel): @@ -584,8 +602,18 @@ def get_ac_db_channel(self, auto_channel): """ return auto_channel.num_suffix - def vc_channel_number(self, interaction: discord.Interaction, category, categoryObj): - ac_channels = [vc for vc in interaction.guild.voice_channels if str(vc.category).lower().startswith(f'{category}') and vc.name.startswith(category.custom_prefix)] + def vc_channel_number(self, interaction: discord.Interaction, category, category_obj) -> int: + """_summary_ + + Args: + interaction (discord.Interaction): _description_ + category (_type_): _description_ + categoryObj (_type_): _description_ + + Returns: + int: _description_ + """ + ac_channels = [vc for vc in interaction.guild.voice_channels if str(vc.category).lower().startswith(f'{category}') and vc.name.startswith(category_obj.custom_prefix)] return (len(ac_channels) + 1) async def createVCInvite(self, interaction: discord.Interaction, voiceChannel) -> discord.Invite: