import logging from typing import List, Dict, Set from interactions import ( AutocompleteContext, Client, Extension, InteractionContext, OptionType, slash_command, slash_option, listen, Button, ButtonStyle, events, Message, ChannelType, ) from database import KeywordNotify as KeywordNotifyModel class KeywordNotify(Extension): def __init__(self, client: Client): self.client: Client = client self.guild_keywords: Dict[int, Set[str]] = {} @listen(events.Ready) async def ready_init(self): for guild in self.bot.guilds: keywords_q: List[KeywordNotifyModel] = ( KeywordNotifyModel.select(KeywordNotifyModel.keyword) .where(KeywordNotifyModel.guild_id == guild.id) .group_by(KeywordNotifyModel.keyword) ) keywords = [str(kw.keyword) for kw in keywords_q] self.guild_keywords[guild.id] = set(keywords) @slash_command( name="keyword-notify", description="Get notifications when messages containing certain words are sent.", dm_permission=False, default_member_permissions=None, ) async def keyword_notify(self, ctx: InteractionContext): pass @keyword_notify.subcommand( sub_cmd_name="add", sub_cmd_description="Add a keyword to be notified about", ) @slash_option( name="keyword", description="The keyword", opt_type=OptionType.STRING, required=True, max_length=16, ) async def keyword_notify_add(self, ctx: InteractionContext, kw: str): keyword_q: KeywordNotifyModel keyword_q, created = KeywordNotifyModel.get_or_create( guild_id=ctx.guild.id, user_id=ctx.author.id, keyword=kw ) if not created: await ctx.send("Keyword already exists!", ephemeral=True) return self.guild_keywords[int(keyword_q.guild_id)].add(kw) await ctx.send("Keyword created!", ephemeral=True) @keyword_notify.subcommand( sub_cmd_name="remove", sub_cmd_description="Remove a keyword to no longer be notified about it", ) @slash_option( name="keyword", description="The keyword to remove", opt_type=OptionType.INTEGER, required=True, autocomplete=True, ) async def keyword_notify_remove(self, ctx: InteractionContext, kw_id: int): guild_id = ctx.guild_id user_id = ctx.author.id print(f"Got keyword: {kw_id}") keyword: KeywordNotifyModel = KeywordNotifyModel.get_or_none(id=kw_id) if keyword.user_id != ctx.author.id: await ctx.send( "An error occurred attempting to remove this keyword. Please contact the developer with error code: KEYWORDNOTIFY_ERR_INVALID_AUTHOR", ephemeral=True, ) return keyword_name = keyword.keyword KeywordNotifyModel.delete_by_id(keyword.id) if ( KeywordNotifyModel.select() .where( KeywordNotifyModel.keyword == keyword_name, KeywordNotifyModel.guild_id == ctx.guild.id, ) .count() == 0 ): self.guild_keywords[ctx.guild.id].remove(keyword_name) await ctx.send(f"Keyword '{keyword_name}' was deleted", ephemeral=True) @keyword_notify_remove.autocomplete("keyword") async def keyword_notify_remove_keyword_autocomplete( self, ctx: AutocompleteContext, keyword: str ): keywords: List[KeywordNotifyModel] = get_user_keywords( ctx.guild_id, ctx.author.id ) choices = [{"name": x.keyword, "value": x.id} for x in keywords] choices = list(filter(lambda x: x["name"].startswith(keyword), choices)) await ctx.send(choices=choices if len(choices) <= 25 else choices[:25]) @keyword_notify.subcommand( sub_cmd_name="list", sub_cmd_description="List the current keywords", ) async def keyword_notify_list(self, ctx: InteractionContext): guild_id = ctx.guild_id user_id = ctx.author.id keywords = get_user_keywords(guild_id, user_id) if len(keywords) == 0: await ctx.send(ephemeral=True, content="You have no keywords.") return keywords = [kw.keyword for kw in keywords] kw_string = "- " + "\n- ".join(keywords) await ctx.send( ephemeral=True, content=f"Your current keywords in this server are:" "\n" f"{kw_string}", ) @listen(events.MessageCreate) async def on_message_check_keyword(self, event: events.MessageCreate): if event.message.channel.type in [ChannelType.DM, ChannelType.GROUP_DM]: return guild = event.message.channel.guild self.guild_keywords[guild.id] for keyword in self.guild_keywords[guild.id]: if keyword in event.message.content: logging.info("Message event triggered in KeywordNotify") keywords_q: List[KeywordNotifyModel] = KeywordNotifyModel.select( KeywordNotifyModel.user_id ).where( KeywordNotifyModel.keyword == keyword, KeywordNotifyModel.guild_id == guild.id, ) users = [int(kw.user_id) for kw in keywords_q] await self._send_message_to_user(users, keyword, event.message) async def _send_message_to_user( self, users: List[int], keyword: str, message: Message ): quoted_message = "> " + message.content.replace("\n", "\n> ") for user in users: user = await self.bot.fetch_user(user) if user is not None: logging.info( f"Attempting to send DM to {user.username}#{user.discriminator}" ) await user.send( ( f"A message has been sent with the keyword '{keyword}' in *{message.guild.name}*" "\n\n" f"{quoted_message}" "\n" f"{message.author.mention}" ), components=Button( style=ButtonStyle.LINK, label="Go to message", url=message.jump_url, ), ) def get_user_keywords(guild_id, user_id) -> list[KeywordNotifyModel]: current_keywords: List[KeywordNotifyModel] = ( KeywordNotifyModel.select() .where( KeywordNotifyModel.guild_id == guild_id, KeywordNotifyModel.user_id == user_id, ) .execute() ) return current_keywords def setup(client: Client): KeywordNotifyModel.create_table() x = KeywordNotify(client) logging.info(f"{KeywordNotify.__name__} extension loaded.")