Heimdallr/heimdallr/Heimdallr.py

300 lines
9.3 KiB
Python

import logging
from os import getenv
from typing import Optional
from naff import (
Client,
Guild,
Intents,
listen,
slash_command,
slash_option,
InteractionContext,
AuditLogEventType,
AuditLogEntry,
RoleSelectMenu,
is_owner,
check,
OptionTypes,
GuildChannel,
DM,
)
from naff.api import events
from naff.models.discord.embed import (
Embed,
EmbedAuthor,
EmbedField,
EmbedFooter,
)
from dotenv import load_dotenv
from database import GuildSettings as GuildSettingsModel, JoinLeave
class HeimdallrClient(Client):
@listen()
async def on_ready(self):
print("------------------------------------")
print(f"Bot '{bot.user.username}' is ready!")
print(f"This bot is owned by {bot.owner}")
print()
print("------------------------------------")
for guild in bot.guilds:
guild_q = GuildSettingsModel.get_or_none(
GuildSettingsModel.guild_id == guild.id
)
if guild_q is None:
guild_q = GuildSettingsModel(guild_id=guild.id)
guild_q.save()
@listen(events.MemberAdd)
async def on_member_join(self, event: events.MemberAdd):
joinleave_q: JoinLeave
joinleave_q, _ = JoinLeave.get_or_create(guild_id=event.guild.id)
if joinleave_q.message_channel is None:
return
channel = await bot.fetch_channel(joinleave_q.message_channel)
if not joinleave_q.join_message_enabled:
return
if joinleave_q.join_message is None or joinleave_q.join_message == "":
await channel.send(f"{event.member.mention} has joined the server!")
return
await channel.send(
str(joinleave_q.join_message).format(member=event.member, guild=event.guild)
)
@listen(events.MemberRemove)
async def on_member_leave(self, event: events.MemberRemove):
joinleave_q: JoinLeave
joinleave_q, _ = JoinLeave.get_or_create(guild_id=event.guild.id)
guildsettings_q: GuildSettingsModel
guildsettings_q, _ = GuildSettingsModel.get_or_create(guild_id=event.guild.id)
entry: AuditLogEntry
async for entry in event.guild.audit_log_history(
action_type=AuditLogEventType.MEMBER_KICK, limit=10
):
if (
entry.target_id != event.member.id
or guildsettings_q.admin_channel is None
):
continue
channel = await bot.fetch_channel(guildsettings_q.admin_channel)
await channel.send(
f"{event.member.mention} was kicked with reason: {entry.reason}"
)
break
if joinleave_q.message_channel is None:
return
channel = await bot.fetch_channel(joinleave_q.message_channel)
if not joinleave_q.leave_message_enabled:
return
if joinleave_q.leave_message is None or joinleave_q.leave_message == "":
await channel.send(f"{event.member.mention} has left the server!")
return
await channel.send(
str(joinleave_q.leave_message).format(
member=event.member, guild=event.guild
)
)
@slash_command(name="ping", description="Ping the bot")
async def ping_command(self, ctx: InteractionContext):
ctx.ephemeral = True
await ctx.send("Pong!", components=RoleSelectMenu(placeholder="HONK"))
@slash_command(name="bot-info", description="Get info about the bot")
async def bot_info_command(self, ctx: InteractionContext):
await ctx.send(
ephemeral=True,
embed=Embed(
title=f"{bot.user.username}",
description=f"This bot is owned by {bot.owner}",
color=0x00FF00,
timestamp=bot.user.created_at,
url=bot.user.avatar.as_url(),
author=EmbedAuthor(
name=bot.user.username,
icon_url=bot.user.avatar.as_url(),
),
footer=EmbedFooter(
text=f"Created at {bot.user.created_at}",
icon_url=bot.user.avatar.as_url(),
),
fields=[
EmbedField(
name="**Extensions**",
value=f"{', '.join(bot.ext.keys())}",
),
EmbedField(
name="**Guilds**",
value="\n".join(
[g.name for g in sorted(bot.guilds, key=lambda g: g.name)]
),
),
],
),
)
@check(is_owner())
@slash_command(
name="owner",
description="Owner-related commands",
dm_permission=True,
# Only available in the dev server.
scopes=[
387153131378835456, # BotTestServer
],
)
async def owner_cmd(self, ctx: InteractionContext):
pass
@owner_cmd.subcommand(
sub_cmd_name="reload-module",
sub_cmd_description="Reload a bot module",
)
@slash_option(
name="module",
description="The module to reload",
opt_type=OptionTypes.STRING,
required=True,
)
async def owner_reload(self, ctx: InteractionContext, module: str):
await ctx.defer(ephemeral=True)
mods = [ext.extension_name for ext in bot.get_extensions(module)]
if len(mods) == 1:
module = mods[0]
elif len(mods) < 1:
await ctx.send("No modules with that name are loaded.")
return
else:
modnames = "\n".join(mods)
await ctx.send("**Found multiple matching modules:**\n" f"{modnames}")
try:
self.reload_extension(module)
except Exception as e:
logging.warn(f"Failed to reload '{module}'.")
await ctx.send(f"Failed to reload '{module}'.")
else:
logging.info(f"Extension '{module}' reloaded.")
await ctx.send(f"Extension '{module}' reloaded.")
async def send_message_to_admins(
self,
guild: Guild | int,
content: Optional[str] = None,
embed: Optional[Embed] = None,
embeds: list[Embed] | None = None
):
"""Send a message to the guild's admin channel or admin.
Convenience method to send a message to the guild's admin/mod channel,
or to the owner of the server if an admin channel is not set.
"""
if embeds is None and embed is not None:
embeds = [embed]
elif embeds is not None and embed is not None:
embeds.append(embed)
elif content is None and embed is None and embeds is None:
raise ValueError("At least one of `content`, `embed`, or `embeds` must be set.")
guild_id = None
match guild:
case Guild():
guild_id = guild.id
case int():
guild_id = guild
guild = await self.fetch_guild(guild_id)
case _:
raise TypeError("Argument `guild` must be of type Guild or int")
guildsettings_q: GuildSettingsModel
guildsettings_q, _ = GuildSettingsModel.get_or_create(guild_id=guild_id)
channel: Optional[GuildChannel | DM]
if guildsettings_q.admin_channel == None:
owner = await guild.fetch_owner()
channel = await owner.fetch_dm()
content = (
content
+ "\nSet an admin channel to stop receiving these messages as DMs"
if content is not None
else "Set an admin channel to stop receiving these messages as DMs"
)
else:
channel = await guild.fetch_channel(int(guildsettings_q.admin_channel))
await channel.send(content=content, embeds=embeds)
bot = HeimdallrClient(
intents=Intents.ALL,
debug_scope=387153131378835456,
sync_interactions=True,
fetch_members=True,
)
def set_loglevel(level: str):
loglevel = logging.WARNING
match str(level).lower().strip():
case "d" | "debug":
loglevel = logging.DEBUG
case "i" | "info" | "information":
loglevel = logging.INFO
case "w" | "warn" | "warning":
loglevel = logging.WARNING
case "e" | "error":
loglevel = logging.ERROR
case "c" | "critical":
loglevel = logging.CRITICAL
case _:
loglevel = logging.WARNING
logging.basicConfig(level=loglevel)
def main():
load_dotenv() # Load environment variables from .env file
set_loglevel(getenv("HEIMDALLR_LOGLEVEL"))
# Create basic tables
GuildSettingsModel.create_table()
JoinLeave.create_table()
# Load extensions
bot.load_extension("heimdallr.commands.admin")
bot.load_extension("heimdallr.commands.gatekeep")
bot.load_extension("heimdallr.commands.quote")
bot.load_extension("heimdallr.commands.infractions")
bot.load_extension("heimdallr.commands.self_roles")
bot.load_extension("heimdallr.commands.polls")
bot.load_extension("heimdallr.commands.bot_messages")
bot.load_extension("heimdallr.commands.modmail")
bot.load_extension("heimdallr.commands.keyword_notify")V
bot.start(getenv("DISCORD_TOKEN"))
if __name__ == "__main__":
main()