Added `send_to_admins` utility method.
continuous-integration/drone/push Build is passing Details

Sends message to admin channel if it exists,
otherwise to server owner.
This commit is contained in:
Vegard Berg 2023-03-02 21:44:39 +01:00
parent 27254e50e9
commit bf7d4ba23f
1 changed files with 136 additions and 29 deletions

View File

@ -1,15 +1,23 @@
import logging import logging
from os import getenv from os import getenv
from typing import Optional
from naff import ( from naff import (
Client, Client,
Guild,
Intents, Intents,
listen, listen,
slash_command, slash_command,
slash_option,
InteractionContext, InteractionContext,
AuditLogEventType, AuditLogEventType,
AuditLogEntry, AuditLogEntry,
RoleSelectMenu, RoleSelectMenu,
is_owner,
check,
OptionTypes,
GuildChannel,
DM,
) )
from naff.api import events from naff.api import events
from naff.models.discord.embed import ( from naff.models.discord.embed import (
@ -22,21 +30,24 @@ from naff.models.discord.embed import (
from dotenv import load_dotenv from dotenv import load_dotenv
from database import GuildSettings as GuildSettingsModel, JoinLeave from database import GuildSettings as GuildSettingsModel, JoinLeave
class HeimdallrClient(Client): class HeimdallrClient(Client):
@listen() @listen()
async def on_ready(self): async def on_ready(self):
print("------------------------------------") print("------------------------------------")
print(f"Bot '{bot.user.username}' is ready!") print(f"Bot '{bot.user.username}' is ready!")
print(f"This bot is owned by {bot.owner}") print(f"This bot is owned by {bot.owner}")
print()
print("------------------------------------") print("------------------------------------")
for guild in bot.guilds: for guild in bot.guilds:
guild_q = GuildSettingsModel.get_or_none(GuildSettingsModel.guild_id == guild.id) guild_q = GuildSettingsModel.get_or_none(
GuildSettingsModel.guild_id == guild.id
)
if guild_q is None: if guild_q is None:
guild_q = GuildSettingsModel(guild_id=guild.id) guild_q = GuildSettingsModel(guild_id=guild.id)
guild_q.save() guild_q.save()
@listen(events.MemberAdd) @listen(events.MemberAdd)
async def on_member_join(self, event: events.MemberAdd): async def on_member_join(self, event: events.MemberAdd):
joinleave_q: JoinLeave joinleave_q: JoinLeave
@ -50,12 +61,12 @@ class HeimdallrClient(Client):
return return
if joinleave_q.join_message is None or joinleave_q.join_message == "": if joinleave_q.join_message is None or joinleave_q.join_message == "":
await channel.send( await channel.send(f"{event.member.mention} has joined the server!")
f"{event.member.mention} has joined the server!"
)
return return
await channel.send(str(joinleave_q.join_message).format(member=event.member, guild=event.guild)) await channel.send(
str(joinleave_q.join_message).format(member=event.member, guild=event.guild)
)
@listen(events.MemberRemove) @listen(events.MemberRemove)
async def on_member_leave(self, event: events.MemberRemove): async def on_member_leave(self, event: events.MemberRemove):
@ -67,19 +78,20 @@ class HeimdallrClient(Client):
entry: AuditLogEntry entry: AuditLogEntry
async for entry in event.guild.audit_log_history( async for entry in event.guild.audit_log_history(
action_type=AuditLogEventType.MEMBER_KICK, action_type=AuditLogEventType.MEMBER_KICK, limit=10
limit=10
): ):
if (entry.target_id != event.member.id or if (
guildsettings_q.admin_channel is None): entry.target_id != event.member.id
or guildsettings_q.admin_channel is None
):
continue continue
channel = await bot.fetch_channel(guildsettings_q.admin_channel) channel = await bot.fetch_channel(guildsettings_q.admin_channel)
await channel.send(f"{event.member.mention} was kicked with reason: {entry.reason}") await channel.send(
f"{event.member.mention} was kicked with reason: {entry.reason}"
)
break break
if joinleave_q.message_channel is None: if joinleave_q.message_channel is None:
return return
channel = await bot.fetch_channel(joinleave_q.message_channel) channel = await bot.fetch_channel(joinleave_q.message_channel)
@ -88,22 +100,19 @@ class HeimdallrClient(Client):
return return
if joinleave_q.leave_message is None or joinleave_q.leave_message == "": if joinleave_q.leave_message is None or joinleave_q.leave_message == "":
await channel.send( await channel.send(f"{event.member.mention} has left the server!")
f"{event.member.mention} has left the server!"
)
return return
await channel.send(str(joinleave_q.leave_message).format(member=event.member, guild=event.guild)) await channel.send(
str(joinleave_q.leave_message).format(
member=event.member, guild=event.guild
)
)
@slash_command(name="ping", description="Ping the bot") @slash_command(name="ping", description="Ping the bot")
async def ping_command(self, ctx: InteractionContext): async def ping_command(self, ctx: InteractionContext):
ctx.ephemeral = True ctx.ephemeral = True
await ctx.send("Pong!", await ctx.send("Pong!", components=RoleSelectMenu(placeholder="HONK"))
components=RoleSelectMenu(placeholder="HONK")
)
@slash_command(name="bot-info", description="Get info about the bot") @slash_command(name="bot-info", description="Get info about the bot")
async def bot_info_command(self, ctx: InteractionContext): async def bot_info_command(self, ctx: InteractionContext):
@ -130,20 +139,117 @@ class HeimdallrClient(Client):
), ),
EmbedField( EmbedField(
name="**Guilds**", name="**Guilds**",
value="\n".join([g.name for g in sorted(bot.guilds, key=lambda g: g.name)]), value="\n".join(
[g.name for g in sorted(bot.guilds, key=lambda g: g.name)]
),
), ),
], ],
), ),
) )
bot = HeimdallrClient(intents=Intents.ALL, debug_scope=387153131378835456, @check(is_owner())
@slash_command(
name="owner",
description="Owner-related commands",
dm_permission=True,
# Only available in the dev server.
scopes=[
387153131378835456, # BotTestServer
],
)
async def owner_cmd(self, ctx: InteractionContext):
pass
@owner_cmd.subcommand(
sub_cmd_name="reload-module",
sub_cmd_description="Reload a bot module",
)
@slash_option(
name="module",
description="The module to reload",
opt_type=OptionTypes.STRING,
required=True,
)
async def owner_reload(self, ctx: InteractionContext, module: str):
await ctx.defer(ephemeral=True)
mods = [ext.extension_name for ext in bot.get_extensions(module)]
if len(mods) == 1:
module = mods[0]
elif len(mods) < 1:
await ctx.send("No modules with that name are loaded.")
return
else:
modnames = "\n".join(mods)
await ctx.send("**Found multiple matching modules:**\n" f"{modnames}")
try:
self.reload_extension(module)
except Exception as e:
logging.warn(f"Failed to reload '{module}'.")
await ctx.send(f"Failed to reload '{module}'.")
else:
logging.info(f"Extension '{module}' reloaded.")
await ctx.send(f"Extension '{module}' reloaded.")
async def send_message_to_admins(
self,
guild: Guild | int,
content: Optional[str] = None,
embed: Optional[Embed] = None,
embeds: list[Embed] | None = None
):
"""Send a message to the guild's admin channel or admin.
Convenience method to send a message to the guild's admin/mod channel,
or to the owner of the server if an admin channel is not set.
"""
if embeds is None and embed is not None:
embeds = [embed]
elif embeds is not None and embed is not None:
embeds.append(embed)
elif content is None and embed is None and embeds is None:
raise ValueError("At least one of `content`, `embed`, or `embeds` must be set.")
guild_id = None
match guild:
case Guild():
guild_id = guild.id
case int():
guild_id = guild
guild = await self.fetch_guild(guild_id)
case _:
raise TypeError("Argument `guild` must be of type Guild or int")
guildsettings_q: GuildSettingsModel
guildsettings_q, _ = GuildSettingsModel.get_or_create(guild_id=guild_id)
channel: Optional[GuildChannel | DM]
if guildsettings_q.admin_channel == None:
owner = await guild.fetch_owner()
channel = await owner.fetch_dm()
content = (
content
+ "\nSet an admin channel to stop receiving these messages as DMs"
if content is not None
else "Set an admin channel to stop receiving these messages as DMs"
)
else:
channel = await guild.fetch_channel(int(guildsettings_q.admin_channel))
await channel.send(content=content, embeds=embeds)
bot = HeimdallrClient(
intents=Intents.ALL,
debug_scope=387153131378835456,
sync_interactions=True, sync_interactions=True,
fetch_members=True, fetch_members=True,
) )
def set_loglevel(level: str): def set_loglevel(level: str):
loglevel = logging.WARNING loglevel = logging.WARNING
@ -169,7 +275,7 @@ def set_loglevel(level: str):
def main(): def main():
load_dotenv() # Load environment variables from .env file load_dotenv() # Load environment variables from .env file
set_loglevel(getenv("HEIMDALLR_LOGLEVEL")) set_loglevel(getenv("HEIMDALLR_LOGLEVEL"))
# Create basic tables # Create basic tables
@ -187,5 +293,6 @@ def main():
bot.load_extension("heimdallr.commands.modmail") bot.load_extension("heimdallr.commands.modmail")
bot.start(getenv("DISCORD_TOKEN")) bot.start(getenv("DISCORD_TOKEN"))
if __name__ == "__main__": if __name__ == "__main__":
main() main()