From bf7d4ba23f25aa35bdf32be69946995daf6cfc87 Mon Sep 17 00:00:00 2001 From: Vegard Berg Date: Thu, 2 Mar 2023 21:44:39 +0100 Subject: [PATCH] Added `send_to_admins` utility method. Sends message to admin channel if it exists, otherwise to server owner. --- heimdallr/Heimdallr.py | 165 +++++++++++++++++++++++++++++++++-------- 1 file changed, 136 insertions(+), 29 deletions(-) diff --git a/heimdallr/Heimdallr.py b/heimdallr/Heimdallr.py index ce73ee5..3079fc5 100644 --- a/heimdallr/Heimdallr.py +++ b/heimdallr/Heimdallr.py @@ -1,15 +1,23 @@ import logging from os import getenv +from typing import Optional from naff import ( Client, + Guild, Intents, listen, slash_command, + slash_option, InteractionContext, AuditLogEventType, AuditLogEntry, RoleSelectMenu, + is_owner, + check, + OptionTypes, + GuildChannel, + DM, ) from naff.api import events from naff.models.discord.embed import ( @@ -22,21 +30,24 @@ from naff.models.discord.embed import ( from dotenv import load_dotenv from database import GuildSettings as GuildSettingsModel, JoinLeave + class HeimdallrClient(Client): @listen() async def on_ready(self): print("------------------------------------") print(f"Bot '{bot.user.username}' is ready!") print(f"This bot is owned by {bot.owner}") + print() print("------------------------------------") for guild in bot.guilds: - guild_q = GuildSettingsModel.get_or_none(GuildSettingsModel.guild_id == guild.id) + guild_q = GuildSettingsModel.get_or_none( + GuildSettingsModel.guild_id == guild.id + ) if guild_q is None: guild_q = GuildSettingsModel(guild_id=guild.id) guild_q.save() - @listen(events.MemberAdd) async def on_member_join(self, event: events.MemberAdd): joinleave_q: JoinLeave @@ -50,12 +61,12 @@ class HeimdallrClient(Client): return if joinleave_q.join_message is None or joinleave_q.join_message == "": - await channel.send( - f"{event.member.mention} has joined the server!" - ) + await channel.send(f"{event.member.mention} has joined the server!") return - await channel.send(str(joinleave_q.join_message).format(member=event.member, guild=event.guild)) + await channel.send( + str(joinleave_q.join_message).format(member=event.member, guild=event.guild) + ) @listen(events.MemberRemove) async def on_member_leave(self, event: events.MemberRemove): @@ -67,18 +78,19 @@ class HeimdallrClient(Client): entry: AuditLogEntry async for entry in event.guild.audit_log_history( - action_type=AuditLogEventType.MEMBER_KICK, - limit=10 + action_type=AuditLogEventType.MEMBER_KICK, limit=10 ): - if (entry.target_id != event.member.id or - guildsettings_q.admin_channel is None): + if ( + entry.target_id != event.member.id + or guildsettings_q.admin_channel is None + ): continue channel = await bot.fetch_channel(guildsettings_q.admin_channel) - await channel.send(f"{event.member.mention} was kicked with reason: {entry.reason}") + await channel.send( + f"{event.member.mention} was kicked with reason: {entry.reason}" + ) break - - if joinleave_q.message_channel is None: return @@ -88,22 +100,19 @@ class HeimdallrClient(Client): return if joinleave_q.leave_message is None or joinleave_q.leave_message == "": - await channel.send( - f"{event.member.mention} has left the server!" - ) + await channel.send(f"{event.member.mention} has left the server!") return - await channel.send(str(joinleave_q.leave_message).format(member=event.member, guild=event.guild)) - - + await channel.send( + str(joinleave_q.leave_message).format( + member=event.member, guild=event.guild + ) + ) @slash_command(name="ping", description="Ping the bot") async def ping_command(self, ctx: InteractionContext): ctx.ephemeral = True - await ctx.send("Pong!", - components=RoleSelectMenu(placeholder="HONK") - ) - + await ctx.send("Pong!", components=RoleSelectMenu(placeholder="HONK")) @slash_command(name="bot-info", description="Get info about the bot") async def bot_info_command(self, ctx: InteractionContext): @@ -130,20 +139,117 @@ class HeimdallrClient(Client): ), EmbedField( name="**Guilds**", - value="\n".join([g.name for g in sorted(bot.guilds, key=lambda g: g.name)]), + value="\n".join( + [g.name for g in sorted(bot.guilds, key=lambda g: g.name)] + ), ), ], ), ) -bot = HeimdallrClient(intents=Intents.ALL, debug_scope=387153131378835456, + @check(is_owner()) + @slash_command( + name="owner", + description="Owner-related commands", + dm_permission=True, + # Only available in the dev server. + scopes=[ + 387153131378835456, # BotTestServer + ], + ) + async def owner_cmd(self, ctx: InteractionContext): + pass + + @owner_cmd.subcommand( + sub_cmd_name="reload-module", + sub_cmd_description="Reload a bot module", + ) + @slash_option( + name="module", + description="The module to reload", + opt_type=OptionTypes.STRING, + required=True, + ) + async def owner_reload(self, ctx: InteractionContext, module: str): + await ctx.defer(ephemeral=True) + + mods = [ext.extension_name for ext in bot.get_extensions(module)] + + if len(mods) == 1: + module = mods[0] + elif len(mods) < 1: + await ctx.send("No modules with that name are loaded.") + return + else: + modnames = "\n".join(mods) + await ctx.send("**Found multiple matching modules:**\n" f"{modnames}") + + try: + self.reload_extension(module) + except Exception as e: + logging.warn(f"Failed to reload '{module}'.") + await ctx.send(f"Failed to reload '{module}'.") + else: + logging.info(f"Extension '{module}' reloaded.") + await ctx.send(f"Extension '{module}' reloaded.") + + async def send_message_to_admins( + self, + guild: Guild | int, + content: Optional[str] = None, + embed: Optional[Embed] = None, + embeds: list[Embed] | None = None + ): + """Send a message to the guild's admin channel or admin. + + Convenience method to send a message to the guild's admin/mod channel, + or to the owner of the server if an admin channel is not set. + """ + if embeds is None and embed is not None: + embeds = [embed] + elif embeds is not None and embed is not None: + embeds.append(embed) + elif content is None and embed is None and embeds is None: + raise ValueError("At least one of `content`, `embed`, or `embeds` must be set.") + + guild_id = None + match guild: + case Guild(): + guild_id = guild.id + case int(): + guild_id = guild + guild = await self.fetch_guild(guild_id) + case _: + raise TypeError("Argument `guild` must be of type Guild or int") + + guildsettings_q: GuildSettingsModel + guildsettings_q, _ = GuildSettingsModel.get_or_create(guild_id=guild_id) + + channel: Optional[GuildChannel | DM] + if guildsettings_q.admin_channel == None: + owner = await guild.fetch_owner() + channel = await owner.fetch_dm() + + content = ( + content + + "\nSet an admin channel to stop receiving these messages as DMs" + if content is not None + else "Set an admin channel to stop receiving these messages as DMs" + ) + else: + channel = await guild.fetch_channel(int(guildsettings_q.admin_channel)) + + await channel.send(content=content, embeds=embeds) + + +bot = HeimdallrClient( + intents=Intents.ALL, + debug_scope=387153131378835456, sync_interactions=True, fetch_members=True, - ) - def set_loglevel(level: str): loglevel = logging.WARNING @@ -169,7 +275,7 @@ def set_loglevel(level: str): def main(): - load_dotenv() # Load environment variables from .env file + load_dotenv() # Load environment variables from .env file set_loglevel(getenv("HEIMDALLR_LOGLEVEL")) # Create basic tables @@ -187,5 +293,6 @@ def main(): bot.load_extension("heimdallr.commands.modmail") bot.start(getenv("DISCORD_TOKEN")) + if __name__ == "__main__": - main() \ No newline at end of file + main()