Skip to content

Commit

Permalink
Merge pull request #48 from hhollenstain/vc_autogenerated_cat
Browse files Browse the repository at this point in the history
Lookup enabled categories to create custom channels
  • Loading branch information
hhollenstain authored Sep 18, 2022
2 parents 3b70fb0 + abc7a29 commit 9506cbe
Showing 1 changed file with 138 additions and 110 deletions.
248 changes: 138 additions & 110 deletions autochannel/lib/plugins/autochannels.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional
import asyncio
import datetime
from unicodedata import category
import discord
import logging
import queue
Expand Down Expand Up @@ -102,14 +103,13 @@ async def queue_loop(self):


@task_metrics_counter
async def ac_delete_channel(self, cat, force, **kwargs):
async def ac_delete_channel(self, cat: discord.CategoryChannel, force: bool, **kwargs) -> None:
"""_summary_
Args:
cat (_type_): _description_
force (_type_): _description_
cat (discord.CategoryChannel): _description_
force (bool): _description_
"""

category = self.autochannel.session.query(Category).get(cat.id)
db_channel_list_id = category.get_channels()
if force:
Expand All @@ -129,15 +129,15 @@ async def ac_delete_channel(self, cat, force, **kwargs):
LOG.debug (f'GUILD: {cat.guild.name} CAT: {cat.name} : No more channels to clean up')

@task_metrics_counter
async def ac_create_channel(self, cat, **kwargs):
async def ac_create_channel(self, cat: discord.CategoryChannel, **kwargs) -> Optional[discord.VoiceChannel]:
"""_summary_
Args:
cat (_type_): _description_
cat (discord.CategoryChannel): _description_
Returns:
_type_: _description_
"""
Optional[discord.VoiceChannel]: _description_
"""
db_cat = self.autochannel.session.query(Category).get(cat.id)
db_channel_list_id = db_cat.get_channels()
auto_channels = [channel for channel in cat.voice_channels if channel.id in db_channel_list_id]
Expand All @@ -162,16 +162,22 @@ async def ac_create_channel(self, cat, **kwargs):
return created_channel

@task_metrics_counter
async def vc_delete_channel(self, voicechannel, **kwargs):
async def vc_delete_channel(self, voicechannel: discord.VoiceChannel, **kwargs) -> None:
"""_summary_
Args:
voicechannel (_type_): _description_
"""
voicechannel (discord.VoiceChannel): _description_
"""
reason = kwargs.get('reason')
await voicechannel.delete(reason=reason)

async def manage_auto_voice_channels(self, autochannel, guild=None):
async def manage_auto_voice_channels(self, autochannel, guild=None) -> None:
"""_summary_
Args:
autochannel (_type_): _description_
guild (_type_, optional): _description_. Defaults to None.
"""
db_cats_disabled = None
if guild:
db_cats = list(self.autochannel.session.query(Category).with_entities(Category.id).filter_by(enabled=True, guild_id=guild.id).all())
Expand Down Expand Up @@ -292,25 +298,15 @@ async def ac_prefix_sync(self, channel, db_cat, **kwargs):
LOG.debug(f'Channel suffix being updated: {channel_db.num_suffix}')
await channel.edit(name=f'{db_cat.prefix} - {channel_db.num_suffix}')

def ac_highest_empty_channel(self, empty_auto_channels):
"""[summary]
Takes in alist of empty channel objects and returns the channel object
with the highest numbered suffix example channels '1', '5', '6' are in
list but will return the '6' channel as the higest empty channel
Arguments:
empty_auto_channels {[list of channel objects]} -- list of empty
channel objects
"""
highest_empty_channel = None
for ec in empty_auto_channels:
if not highest_empty_channel:
highest_empty_channel = ec
if self.get_ac_channel(ec) > self.get_ac_channel(highest_empty_channel):
highest_empty_channel = ec
return highest_empty_channel
def ac_db_highest_empty_channel(self, empty_auto_channels: list) -> Optional[discord.VoiceChannel]:
"""_summary_
Args:
empty_auto_channels (list): _description_
def ac_db_highest_empty_channel(self, empty_auto_channels):
Returns:
Optional[discord.VoiceChannel]: _description_
"""
highest_empty_channel = None
highest_empty_channel_num = None
for ec in empty_auto_channels:
Expand All @@ -333,7 +329,7 @@ async def on_ready(self):
@app_commands.checks.has_permissions(manage_channels=True)
@app_commands.command(name="sync", description="sync from http://auto-chan.io")
@command_metrics_counter
async def sync(self, interaction: discord.Interaction):
async def sync(self, interaction: discord.Interaction) -> None:
"""_summary_
Args:
Expand All @@ -344,13 +340,19 @@ async def sync(self, interaction: discord.Interaction):


@sync.error
async def on_sync_error(self, interaction: discord.Interaction, error: app_commands.AppCommandError):
await interaction.response.send_message(f"Error {error}")
async def on_sync_error(self, interaction: discord.Interaction, error: app_commands.AppCommandError) -> None:
"""_summary_
Args:
interaction (discord.Interaction): _description_
error (app_commands.AppCommandError): _description_
"""
await interaction.response.send_message(f"Error {error}")

@app_commands.describe(category='name of category', channel_name='custom name of channel')
@app_commands.command(name="vc", description="create voice channel")
@command_metrics_counter
async def vc(self, interaction: discord.Interaction, category: str, channelname: str = ''):
async def vc(self, interaction: discord.Interaction, category: str, channel_name: str = '') -> None:
"""_summary_
Args:
Expand All @@ -366,39 +368,62 @@ async def vc(self, interaction: discord.Interaction, category: str, channelname:
if not self.validateCategory(interaction, category):
raise ACUnknownCategory(f'Unknown Discord category')

categoryName = [cat for cat in interaction.guild.categories if cat.name.lower() in category.lower()][0]
categoryObj = self.autochannel.session.query(Category).get(categoryName.id)
category_name = [cat for cat in interaction.guild.categories if cat.name.lower() in category.lower()][0]
category_obj = self.autochannel.session.query(Category).get(category_name.id)

if channelname:
vcSuffix = channelname
if pf.is_profane(channelname):
"""Checks if there is channel name and if there is profanity used"""
if channel_name:
vc_suffix = channel_name
if pf.is_profane(channel_name):
raise VCProfaneWordused(f'Used a profane word when creating a custom voice channel')

else:
vcSuffix = self.vc_channel_number(interaction, category, categoryObj)
vc_suffix = self.vc_channel_number(interaction, category, category_obj)

if not category_obj.custom_enabled:
raise ACDisabledCustomCategory(f'Category {category_name.name} is disabled. To use custom channels in this category an **ADMIN** must enable: http://auto-chan.io')

if not categoryObj.custom_enabled:
raise ACDisabledCustomCategory(f'Category {categoryName.name} is disabled. To use custom channels in this category an **ADMIN** must enable: http://auto-chan.io')

createdVC= await interaction.guild.create_voice_channel(f'{categoryObj.custom_prefix} {vcSuffix}', overwrites={}, category=categoryName, reason='AutoChannel bot automation')
# overwrite = discord.PermissionOverwrite()
# overwrite.manage_channels = True
# overwrite.manage_roles = True
# await created_channel.set_permissions(ctx.message.author, overwrite=overwrite)
inviteLink = await self.createVCInvite(interaction, createdVC)
created_vc= await interaction.guild.create_voice_channel(f'{category_obj.custom_prefix} {vc_suffix}', overwrites={}, category=category_name, reason='AutoChannel bot automation')
invite_link = await self.createVCInvite(interaction, created_vc)

await interaction.channel.send(f'AutoChannel made `{interaction.user}` a channel `{createdVC.name}`')
await interaction.response.send_message(inviteLink)
await interaction.channel.send(f'AutoChannel made `{interaction.user}` a channel `{created_vc.name}`')
await interaction.response.send_message(invite_link)

await asyncio.sleep(60)
if len(createdVC.members) < 1:
if len(created_vc.members) < 1:
try:
await self.vc_delete_channel(createdVC, reason='No one joined the custom channel after 60 seconds')
await self.vc_delete_channel(created_vc, reason='No one joined the custom channel after 60 seconds')
except:
"""annoying to see this error doesn't add value to the end user"""
pass


@vc.autocomplete('category')
async def vc_autocomplete(self, interaction: discord.Interaction, current: str) -> list[app_commands.Choice[str]]:
"""_summary_
Args:
interaction (discord.Interaction): _description_
current (str): _description_
Returns:
list[app_commands.Choice[str]]: _description_
"""

"""good chance this might cause lots of db queries if abused"""
category_objs = self.autochannel.session.query(Category).filter_by(custom_enabled=True, guild_id=interaction.guild.id).all()

categories = []
for category in category_objs:
try:
categories.append(interaction.guild.get_channel(category.id).name)
except Exception as e:
LOG.error(e)

return [
app_commands.Choice(name=category, value=category)
for category in categories if current.lower() in category.lower()
]

@vc.error
async def on_vc_error(self, interaction: discord.Interaction, error: app_commands.AppCommandError):
"""_summary_
Expand All @@ -418,15 +443,15 @@ async def on_vc_error(self, interaction: discord.Interaction, error: app_command
await interaction.response.send_message(msg)


def valid_auto_channel(self, v_state):
"""[summary]
Arguments:
v_state {[type]} -- [description]
def valid_auto_channel(self, v_state: discord.VoiceState) -> bool:
"""_summary_
Args:
v_state (discord.VoiceState): _description_
Returns:
[type] -- [description]
"""
bool: _description_
"""
if(
v_state is not None and
v_state.channel is not None and
Expand All @@ -449,12 +474,13 @@ def valid_auto_channel(self, v_state):
else:
return False

async def after_ac_task(self, after, member=None):
"""
after_ac_task: handles the updates to the new channel the user entered
:param: object self: discord client
:param: object before: after state channel
"""
async def after_ac_task(self, after: discord.VoiceState, member=None) -> None:
"""_summary_
Args:
after (discord.VoiceState): _description_
member (_type_, optional): _description_. Defaults to None.
"""
cat = after.channel.category
category = self.autochannel.session.query(Category).get(cat.id)
db_channel_list_id = category.get_channels()
Expand All @@ -470,12 +496,13 @@ async def after_ac_task(self, after, member=None):
await self.queue.put(q_object)


async def before_ac_task(self, before, member=None):
"""
before_ac_task: handles the updates to the old channel the user left
:param: object self: discord client
:param: object before: before state channel
"""
async def before_ac_task(self, before: discord.VoiceState, member=None) -> None:
"""handles the updates to the old channel the user left
Args:
before (discord.VoiceState): before state channel
member (_type_, optional): _description_. Defaults to None.
"""
cat = before.channel.category
category = self.autochannel.session.query(Category).get(cat.id)
db_channel_list_id = category.get_channels()
Expand All @@ -495,7 +522,7 @@ async def before_ac_task(self, before, member=None):

@task_metrics_counter
@commands.Cog.listener()
async def on_guild_channel_delete(self, channel):
async def on_guild_channel_delete(self, channel) -> None:
"""This tracks all channel deletes and if the channel is tracked by auto-chan,
it will clean up the db entry. This is imporant due to admins deleting channels managed by auto-chan
that otherwise would get missed.
Expand All @@ -513,17 +540,17 @@ async def on_guild_channel_delete(self, channel):

@task_metrics_counter
@commands.Cog.listener()
async def on_voice_state_update(self, member, before, after):
"""
Checks before and afer states of channels users leave and join
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState) -> None:
"""Checks before and afer states of channels users leave and join
this will handle logic to make sure there is no more than N (current
defaulted to 1) empty channel at any time. This will also create a
if less than N (current set to 1).
:param: object self: discord client
:param: object member: discord member
:param: object before: before voice channel object
:param: object after: after voice channel object
"""
Args:
member (discord.Member): discord member
before (discord.VoiceState): before voice channel object
after (discord.VoiceState): after voice channel object
"""
if(
before.channel is not None and
before.channel.category is not None and
Expand All @@ -540,37 +567,28 @@ async def on_voice_state_update(self, member, before, after):
if self.valid_auto_channel(after):
await self.after_ac_task(after, member=member)

def ac_channel_number(self, auto_channels):
"""
returns number from auto prefix and returns the lowest number
:param: object self: discord client
:param: objects auto_channels: List of voice_channels objects
:returns the lowest number missing from the sequence of voice_channels
"""
suffix_list = []
for channel in auto_channels:
suffix_list.append(int(''.join(channel.name.split(' ')[-1:])))
def ac_db_channel_number(self, db_cat) -> int:
"""returns number from auto prefix and returns the lowest number
return utils.missing_numbers(suffix_list)[0]
Args:
db_cat (_type_): List of voice_channels objects
def ac_db_channel_number(self, db_cat):
"""
returns number from auto prefix and returns the lowest number
:param: object self: discord client
:param: objects auto_channels: List of voice_channels objects
:returns the lowest number missing from the sequence of voice_channels
"""
Returns:
int: the lowest number missing from the sequence of voice_channels
"""
suffix_list = db_cat.get_chan_suffix()

return utils.missing_numbers(suffix_list)[0]

def get_ac_channel(self, auto_channel):
"""
returns the auto generated number suffix of a channel
:param: obejct self: discord client
:param: object auto_channel
:returns the channel suffix number
"""
def get_ac_channel(self, auto_channel) -> int:
"""returns the auto generated number suffix of a channel
Args:
auto_channel (_type_): _description_
Returns:
int: _description_
"""
return int(''.join(auto_channel.name.split(' ')[-1:]))

def get_ac_db_channel(self, auto_channel):
Expand All @@ -584,8 +602,18 @@ def get_ac_db_channel(self, auto_channel):
"""
return auto_channel.num_suffix

def vc_channel_number(self, interaction: discord.Interaction, category, categoryObj):
ac_channels = [vc for vc in interaction.guild.voice_channels if str(vc.category).lower().startswith(f'{category}') and vc.name.startswith(category.custom_prefix)]
def vc_channel_number(self, interaction: discord.Interaction, category, category_obj) -> int:
"""_summary_
Args:
interaction (discord.Interaction): _description_
category (_type_): _description_
categoryObj (_type_): _description_
Returns:
int: _description_
"""
ac_channels = [vc for vc in interaction.guild.voice_channels if str(vc.category).lower().startswith(f'{category}') and vc.name.startswith(category_obj.custom_prefix)]
return (len(ac_channels) + 1)

async def createVCInvite(self, interaction: discord.Interaction, voiceChannel) -> discord.Invite:
Expand Down

0 comments on commit 9506cbe

Please sign in to comment.