forked from PyDrocsid/cogs
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
595 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .cog import ContentFilterCog | ||
|
||
|
||
__all__ = ["ContentFilterCog"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,272 @@ | ||
import re | ||
|
||
from discord import Embed, Forbidden, Message | ||
from discord.ext import commands | ||
from discord.ext.commands import CommandError, Context, Converter, UserInputError, guild_only | ||
|
||
from PyDrocsid.cog import Cog | ||
from PyDrocsid.command import Confirmation, add_reactions, docs | ||
from PyDrocsid.database import db, filter_by, select | ||
from PyDrocsid.embeds import send_long_embed | ||
from PyDrocsid.emojis import name_to_emoji | ||
from PyDrocsid.environment import CACHE_TTL | ||
from PyDrocsid.events import StopEventHandling | ||
from PyDrocsid.redis import redis | ||
from PyDrocsid.translations import t | ||
|
||
from .colors import Colors | ||
from .models import BadWord, BadWordPost, sync_redis | ||
from .permissions import ContentFilterPermission | ||
from ...contributor import Contributor | ||
from ...pubsub import get_userlog_entries, send_alert, send_to_changelog | ||
|
||
|
||
tg = t.g | ||
t = t.content_filter | ||
|
||
|
||
class ContentFilterConverter(Converter): | ||
async def convert(self, ctx: Context, argument: str) -> BadWord: | ||
if argument.isnumeric(): | ||
row = await db.get(BadWord, id=int(argument)) | ||
else: | ||
row = await db.get(BadWord, regex=argument) | ||
|
||
if not row: | ||
raise CommandError(t.not_blacklisted) | ||
|
||
return row | ||
|
||
|
||
class RegexConverter(Converter): | ||
async def convert(self, ctx: Context, argument: str) -> str: | ||
try: | ||
re.compile(argument) | ||
except re.error: | ||
raise CommandError(t.invalid_regex) | ||
|
||
return argument | ||
|
||
|
||
def findall(regex: str, text: str) -> list[str]: | ||
return [match[0] for match in re.finditer(regex, text)] | ||
|
||
|
||
async def get_new_matches(message_id: int, matches: set[str]) -> set[str]: | ||
new_matches = matches - set(await redis.lrange(key := f"content_filter:alert:{message_id}", 0, -1)) | ||
|
||
async with redis.pipeline() as pipe: | ||
for match in new_matches: | ||
await pipe.lpush(key, match) | ||
await pipe.expire(key, CACHE_TTL) | ||
await pipe.execute() | ||
|
||
return new_matches | ||
|
||
|
||
async def check_message(message: Message) -> None: | ||
author = message.author | ||
|
||
if message.guild is None: | ||
return | ||
if await ContentFilterPermission.bypass.check_permissions(author): | ||
return | ||
|
||
violation_regexs: set[str] = set() | ||
violation_matches: set[str] = set() | ||
for regex in await BadWord.get_all_redis(): | ||
if not (matches := findall(regex, message.content)): | ||
continue | ||
|
||
violation_regexs.add(regex) | ||
violation_matches.update(matches) | ||
|
||
if not violation_regexs: | ||
return | ||
|
||
bad_words: list[BadWord] = await db.all(select(BadWord).where(BadWord.regex.in_(violation_regexs))) | ||
bad_word_ids: set[int] = {bad_word.id for bad_word in bad_words} | ||
delete_message = any(bad_word.delete for bad_word in bad_words) | ||
|
||
was_deleted = False | ||
if delete_message: | ||
try: | ||
await message.delete() | ||
except Forbidden: | ||
log_text = t.log_forbidden_posted_not_deleted | ||
else: | ||
was_deleted = True | ||
log_text = t.log_forbidden_posted_deleted | ||
else: | ||
log_text = t.log_forbidden_posted | ||
|
||
new_matches: set[str] = await get_new_matches(message.id, violation_matches) | ||
if new_matches: | ||
await send_alert( | ||
message.guild, | ||
log_text( | ||
f"{author.mention} (`@{author}`, {author.id})", | ||
message.jump_url, | ||
message.channel.mention, | ||
", ".join(sorted(new_matches)), | ||
", ".join(map(str, sorted(bad_word_ids))), | ||
), | ||
) | ||
|
||
for post in new_matches: | ||
await BadWordPost.create(author.id, author.name, message.channel.id, post, was_deleted) | ||
|
||
if was_deleted: | ||
raise StopEventHandling | ||
|
||
await message.add_reaction(name_to_emoji["warning"]) | ||
|
||
|
||
class ContentFilterCog(Cog, name="Content Filter"): | ||
CONTRIBUTORS = [Contributor.NekoFanatic, Contributor.Defelo] | ||
|
||
@get_userlog_entries.subscribe | ||
async def handle_get_ulog_entries(self, user_id: int, _): | ||
out = [] | ||
|
||
log: BadWordPost | ||
async for log in await db.stream(filter_by(BadWordPost, member=user_id)): | ||
if log.deleted_message: | ||
out.append((log.timestamp, t.ulog_message_deleted(log.content, log.channel))) | ||
else: | ||
out.append((log.timestamp, t.ulog_message(log.content, log.channel))) | ||
|
||
return out | ||
|
||
async def on_message(self, message: Message): | ||
await check_message(message) | ||
|
||
async def on_message_edit(self, _, after: Message): | ||
await check_message(after) | ||
|
||
@commands.group(name="content_filter", aliases=["cf"]) | ||
@ContentFilterPermission.read.check | ||
@guild_only() | ||
@docs(t.commands.content_filter) | ||
async def content_filter(self, ctx: Context): | ||
if ctx.subcommand_passed is not None: | ||
if ctx.invoked_subcommand is None: | ||
raise UserInputError | ||
return | ||
|
||
embed = Embed(title=t.bad_word_list_header, colour=Colors.ContentFilter) | ||
|
||
reg: BadWord | ||
async for reg in await db.stream(select(BadWord)): | ||
embed.add_field( | ||
name=t.embed_field_name(reg.id, reg.description), | ||
value=t.embed_field_value(reg.regex, t.delete if reg.delete else t.not_delete), | ||
inline=False, | ||
) | ||
|
||
if not embed.fields: | ||
embed.colour = Colors.error | ||
embed.description = t.no_pattern_listed | ||
|
||
await send_long_embed(ctx, embed, paginate=True, max_fields=6) | ||
|
||
@content_filter.command(name="add", aliases=["a", "+"]) | ||
@ContentFilterPermission.write.check | ||
@docs(t.commands.add) | ||
async def add(self, ctx: Context, regex: RegexConverter, delete: bool, *, description: str): | ||
regex: str | ||
|
||
if await db.exists(filter_by(BadWord, regex=regex)): | ||
raise CommandError(t.already_blacklisted) | ||
|
||
if len(description) > 500: | ||
raise CommandError(t.description_length) | ||
|
||
await BadWord.create(regex, description, delete) | ||
await add_reactions(ctx.message, "white_check_mark") | ||
await send_to_changelog(ctx.guild, t.log_content_filter_added(regex, ctx.author.mention)) | ||
|
||
@content_filter.command(name="remove", aliases=["del", "r", "d", "-"]) | ||
@ContentFilterPermission.write.check | ||
@docs(t.commands.remove) | ||
async def remove(self, ctx: Context, pattern: ContentFilterConverter): | ||
pattern: BadWord | ||
|
||
if not await Confirmation().run(ctx, t.confirm_text(pattern.regex, pattern.description)): | ||
return | ||
|
||
await pattern.remove() | ||
await add_reactions(ctx.message, "white_check_mark") | ||
await send_to_changelog(ctx.guild, t.log_content_filter_removed(pattern.regex, ctx.author.mention)) | ||
|
||
@content_filter.group(name="update", aliases=["u"]) | ||
@ContentFilterPermission.write.check | ||
@docs(t.commands.update) | ||
async def update(self, ctx: Context): | ||
if ctx.invoked_subcommand is None: | ||
raise UserInputError | ||
|
||
@update.command(name="description", aliases=["d"]) | ||
@docs(t.commands.update_description) | ||
async def description(self, ctx: Context, pattern: ContentFilterConverter, *, new_description: str): | ||
pattern: BadWord | ||
if len(new_description) > 500: | ||
raise CommandError(t.description_length) | ||
|
||
old = pattern.description | ||
pattern.description = new_description | ||
|
||
await add_reactions(ctx.message, "white_check_mark") | ||
await send_to_changelog(ctx.guild, t.log_description_updated(pattern.regex, old, new_description)) | ||
|
||
@update.command(name="regex", aliases=["r"]) | ||
@docs(t.commands.update_regex) | ||
async def regex(self, ctx: Context, pattern: ContentFilterConverter, *, new_regex: RegexConverter): | ||
pattern: BadWord | ||
new_regex: str | ||
|
||
if await db.exists(filter_by(BadWord, regex=new_regex)): | ||
raise CommandError(t.already_blacklisted) | ||
|
||
old = pattern.regex | ||
pattern.regex = new_regex | ||
await sync_redis() | ||
|
||
await add_reactions(ctx.message, "white_check_mark") | ||
await send_to_changelog(ctx.guild, t.log_regex_updated(old, pattern.regex)) | ||
|
||
@update.command(name="delete_message", aliases=["del", "del_message", "dm"]) | ||
@docs(t.commands.delete_message) | ||
async def delete_message(self, ctx: Context, pattern: ContentFilterConverter, delete: bool): | ||
pattern: BadWord | ||
pattern.delete = delete | ||
|
||
await add_reactions(ctx.message, "white_check_mark") | ||
await send_to_changelog(ctx.guild, t.log_delete_updated(pattern.delete, pattern.regex)) | ||
|
||
@content_filter.command(name="check", aliases=["c"]) | ||
@ContentFilterPermission.read.check | ||
@docs(t.commands.check) | ||
async def check(self, ctx: Context, pattern: ContentFilterConverter | int | RegexConverter, *, test_string: str): | ||
filters: list[BadWord | str] | ||
if isinstance(pattern, (BadWord, str)): | ||
filters = [pattern] | ||
elif pattern == -1: | ||
filters = await db.all(select(BadWord)) | ||
else: | ||
raise CommandError(t.invalid_pattern) | ||
|
||
out = [] | ||
for rule in filters: | ||
regex = rule.regex if isinstance(rule, BadWord) else rule | ||
if not (matches := findall(regex, test_string)): | ||
continue | ||
|
||
line = f"{rule.id}: " if isinstance(rule, BadWord) else "" | ||
line += f'`{regex}` -> {", ".join(f"`{m}`" for m in sorted(set(matches)))}' | ||
out.append(line) | ||
|
||
embed = Embed(title=t.checked_expressions, description=f"**{test_string}**", color=Colors.ContentFilter) | ||
embed.add_field(name=t.matches, value="\n".join(out) or t.no_matches) | ||
|
||
await send_long_embed(ctx, embed, paginate=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from PyDrocsid.material_colors import MaterialColors | ||
|
||
|
||
class Colors(MaterialColors): | ||
ContentFilter = MaterialColors.bluegrey[400] |
Oops, something went wrong.