diff --git a/database.py b/database.py index a6d3837..337ef82 100644 --- a/database.py +++ b/database.py @@ -231,4 +231,21 @@ class PollVotes(Model): class Meta: table_name = "PollVotes" + database = db + +class KeywordNotify(Model): + """ID of the keyword notify entry""" + id = AutoField() + + """ID of the guild this belongs to""" + guild_id = BigIntegerField() + + """ID of the user this belongs to""" + user_id = BigIntegerField() + + """The keyword to match – in lower case""" + keyword = CharField(max_length=24) + + class Meta: + table_name = "KeywordNotify" database = db \ No newline at end of file diff --git a/heimdallr/Heimdallr.py b/heimdallr/Heimdallr.py index 3079fc5..c07cb04 100644 --- a/heimdallr/Heimdallr.py +++ b/heimdallr/Heimdallr.py @@ -291,6 +291,7 @@ def main(): bot.load_extension("heimdallr.commands.polls") bot.load_extension("heimdallr.commands.bot_messages") bot.load_extension("heimdallr.commands.modmail") + bot.load_extension("heimdallr.commands.keyword_notify")V bot.start(getenv("DISCORD_TOKEN")) diff --git a/heimdallr/commands/keyword_notify.py b/heimdallr/commands/keyword_notify.py new file mode 100644 index 0000000..0e2f8f0 --- /dev/null +++ b/heimdallr/commands/keyword_notify.py @@ -0,0 +1,214 @@ +import logging +from typing import List, Dict, Set +from naff import ( + AutocompleteContext, + Client, + Extension, + InteractionContext, + OptionTypes, + slash_command, + slash_option, + listen, + Button, + ButtonStyles, + events, + Message, + ChannelTypes, +) +from database import KeywordNotify as KeywordNotifyModel + + +class KeywordNotify(Extension): + def __init__(self, client: Client): + self.client: Client = client + self.guild_keywords: Dict[int, Set[str]] = {} + + @listen(events.Ready) + async def ready_init(self): + for guild in self.bot.guilds: + keywords_q: List[KeywordNotifyModel] = ( + KeywordNotifyModel.select(KeywordNotifyModel.keyword) + .where(KeywordNotifyModel.guild_id == guild.id) + .group_by(KeywordNotifyModel.keyword) + ) + + keywords = [str(kw.keyword) for kw in keywords_q] + self.guild_keywords[guild.id] = set(keywords) + + @slash_command( + name="keyword-notify", + description="Get notifications when messages containing certain words are sent.", + dm_permission=False, + default_member_permissions=None, + ) + async def keyword_notify(self, ctx: InteractionContext): + pass + + @keyword_notify.subcommand( + sub_cmd_name="add", + sub_cmd_description="Add a keyword to be notified about", + ) + @slash_option( + name="keyword", + description="The keyword", + opt_type=OptionTypes.STRING, + required=True, + max_length=16, + ) + async def keyword_notify_add(self, ctx: InteractionContext, kw: str): + keyword_q: KeywordNotifyModel + keyword_q, created = KeywordNotifyModel.get_or_create( + guild_id=ctx.guild.id, user_id=ctx.author.id, keyword=kw + ) + + if not created: + await ctx.send("Keyword already exists!", ephemeral=True) + return + + + self.guild_keywords[int(keyword_q.guild_id)].add(kw) + await ctx.send("Keyword created!", ephemeral=True) + + @keyword_notify.subcommand( + sub_cmd_name="remove", + sub_cmd_description="Remove a keyword to no longer be notified about it", + ) + @slash_option( + name="keyword", + description="The keyword to remove", + opt_type=OptionTypes.INTEGER, + required=True, + autocomplete=True, + ) + async def keyword_notify_remove(self, ctx: InteractionContext, kw_id: int): + guild_id = ctx.guild_id + user_id = ctx.author.id + + print(f"Got keyword: {kw_id}") + + keyword: KeywordNotifyModel = KeywordNotifyModel.get_or_none(id=kw_id) + + if keyword.user_id != ctx.author.id: + await ctx.send( + "An error occurred attempting to remove this keyword. Please contact the developer with error code: KEYWORDNOTIFY_ERR_INVALID_AUTHOR", + ephemeral=True, + ) + return + + keyword_name = keyword.keyword + + KeywordNotifyModel.delete_by_id(keyword.id) + + if ( + KeywordNotifyModel.select() + .where( + KeywordNotifyModel.keyword == keyword_name, + KeywordNotifyModel.guild_id == ctx.guild.id, + ) + .count() + == 0 + ): + self.guild_keywords[ctx.guild.id].remove(keyword_name) + await ctx.send(f"Keyword '{keyword_name}' was deleted", ephemeral=True) + + @keyword_notify_remove.autocomplete("keyword") + async def keyword_notify_remove_keyword_autocomplete( + self, ctx: AutocompleteContext, keyword: str + ): + keywords: List[KeywordNotifyModel] = get_user_keywords( + ctx.guild_id, ctx.author.id + ) + + choices = [{"name": x.keyword, "value": x.id} for x in keywords] + choices = list(filter(lambda x: x["name"].startswith(keyword), choices)) + + await ctx.send(choices=choices if len(choices) <= 25 else choices[:25]) + + @keyword_notify.subcommand( + sub_cmd_name="list", + sub_cmd_description="List the current keywords", + ) + async def keyword_notify_list(self, ctx: InteractionContext): + guild_id = ctx.guild_id + user_id = ctx.author.id + + keywords = get_user_keywords(guild_id, user_id) + + if len(keywords) == 0: + await ctx.send(ephemeral=True, content="You have no keywords.") + return + + keywords = [kw.keyword for kw in keywords] + + kw_string = "- " + "\n- ".join(keywords) + + await ctx.send( + ephemeral=True, + content=f"Your current keywords in this server are:" "\n" f"{kw_string}", + ) + + @listen(events.MessageCreate) + async def on_message_check_keyword(self, event: events.MessageCreate): + if event.message.channel.type in [ChannelTypes.DM, ChannelTypes.GROUP_DM]: + return + + guild = event.message.channel.guild + + self.guild_keywords[guild.id] + + for keyword in self.guild_keywords[guild.id]: + if keyword in event.message.content: + logging.info("Message event triggered in KeywordNotify") + keywords_q: List[KeywordNotifyModel] = KeywordNotifyModel.select( + KeywordNotifyModel.user_id + ).where( + KeywordNotifyModel.keyword == keyword, + KeywordNotifyModel.guild_id == guild.id, + ) + + users = [int(kw.user_id) for kw in keywords_q] + await self._send_message_to_user(users, keyword, event.message) + + async def _send_message_to_user( + self, users: List[int], keyword: str, message: Message + ): + quoted_message = "> " + message.content.replace("\n", "\n> ") + for user in users: + user = await self.bot.fetch_user(user) + if user is not None: + logging.info( + f"Attempting to send DM to {user.username}#{user.discriminator}" + ) + await user.send( + ( + f"A message has been sent with the keyword '{keyword}' in *{message.guild.name}*" + "\n\n" + f"{quoted_message}" + "\n" + f"{message.author.mention}" + ), + components=Button( + style=ButtonStyles.LINK, + label="Go to message", + url=message.jump_url, + ), + ) + + +def get_user_keywords(guild_id, user_id) -> list[KeywordNotifyModel]: + current_keywords: List[KeywordNotifyModel] = ( + KeywordNotifyModel.select() + .where( + KeywordNotifyModel.guild_id == guild_id, + KeywordNotifyModel.user_id == user_id, + ) + .execute() + ) + + return current_keywords + + +def setup(client: Client): + KeywordNotifyModel.create_table() + x = KeywordNotify(client) + logging.info(f"{KeywordNotify.__name__} extension loaded.")