Heimdallr/commands/gatekeep.py

418 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import random, string
from io import BytesIO
from naff import (
Extension,
Client,
slash_command,
slash_option,
context_menu,
listen,
CommandTypes,
InteractionContext,
OptionTypes,
Role,
Member,
File,
SlashCommandChoice,
Button,
ButtonStyles,
Embed,
)
from naff.api import events
from naff.models.discord.enums import Permissions
from captcha.image import ImageCaptcha
from captcha.audio import AudioCaptcha
from database import (
GuildSettings as GuildSettingsModel,
Gatekeep as GatekeepModel,
JoinLeave as JoinLeaveModel,
GatekeepCaptchas as GatekeepCaptchasModel,
)
class Gatekeep(Extension):
def __init__(self, client: Client):
self.client = client
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-type",
sub_cmd_description="Set how Gatekeep allows users to join",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="type",
description="Type of Gatekeep",
required=True,
opt_type=OptionTypes.STRING,
choices=[
SlashCommandChoice(
name="Manual",
value="manual",
),
SlashCommandChoice(
name="Captcha",
value="captcha",
),
],
)
async def set_type_command(self, ctx: InteractionContext, type: str):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_method = type
gk.save()
await ctx.send(f"Gatekeep type set to {type}", ephemeral=True)
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-role",
sub_cmd_description="Set the role that users gain upon approval",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="role",
description="Role to grant",
required=True,
opt_type=OptionTypes.ROLE,
)
async def set_role_command(self, ctx: InteractionContext, role: Role):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_approve_role = role.id
gk.save()
await ctx.send(f"Gatekeep role set to {role.name}", ephemeral=True)
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-message",
sub_cmd_description="Set the message that users see when they are approved",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="message",
description="Message to send",
required=True,
opt_type=OptionTypes.STRING,
)
async def set_message_command(self, ctx: InteractionContext, message: str):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_approve_message = message
gk.save()
await ctx.send(f"Gatekeep message set to {message}", ephemeral=True)
@slash_command(
name="approve",
description="Approve a user",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_ROLES,
)
@slash_option(
name="user",
description="User to approve",
required=True,
opt_type=OptionTypes.USER,
)
async def approve_command(self, ctx: InteractionContext, user: Member):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
await user.add_role(int(gk.gatekeep_approve_role))
# Check if a welcome channel is set
welcome_channel = not jl.message_channel is None
# If there is no approval message set, inform the issuer privately.
if gk.gatekeep_approve_message is None:
await ctx.send(
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
ephemeral=True,
)
return
# If there is no welcome channel set, attempt to DM the approval message to the user.
if not welcome_channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
# DM the user if the bot fails to retrieve the welcome channel.
channel = await ctx.guild.fetch_channel(jl.message_channel)
if not channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
# If none of the above occur, finally send the approval message to the welcome channel.
await channel.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
@context_menu(
name="Approve User",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_ROLES,
context_type=CommandTypes.USER,
)
async def approve_context_menu(self, ctx: InteractionContext):
user: Member = ctx.target
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
await user.add_role(int(gk.gatekeep_approve_role))
welcome_channel = not jl.message_channel is None
if gk.gatekeep_approve_message is None:
await ctx.send(
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
ephemeral=True,
)
return
if not welcome_channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
channel = await ctx.guild.fetch_channel(jl.message_channel)
if not channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
await channel.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
# Allow the use of a reaction to approve a user.
# This is mainly for compatibility with NLL.
# The permission thingy should probably be reworked, as it currently allows anyone
# with the manage roles permission to use this.
# TODO: Rewrite this to require a specific role.
@listen(events.MessageReactionAdd)
async def on_reaction_add(self, reaction: events.MessageReactionAdd):
if not reaction.emoji.name in [
"",
"",
"",
] or not reaction.author.has_permission(Permissions.MANAGE_ROLES):
return
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=reaction.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=reaction.guild.id)
if reaction.message.channel.id != jl.message_channel:
return
await reaction.message.author.add_role(int(gk.gatekeep_approve_role))
await reaction.message.channel.send(
str(gk.gatekeep_approve_message).format(
member=reaction.message.author, guild=reaction.guild
)
)
@slash_command(
name="captcha",
description="Complete a captcha",
dm_permission=False,
)
@slash_option(
name="captcha",
description="The captcha solution",
required=True,
opt_type=OptionTypes.STRING,
)
async def captcha_command(self, ctx: InteractionContext, captcha: str):
gk: GatekeepModel = GatekeepModel.get_or_none(
GatekeepModel.guild_id == ctx.guild.id,
)
if gk is None:
await ctx.send("No gatekeep set", ephemeral=True)
return
gkc: GatekeepCaptchasModel = GatekeepCaptchasModel.get_or_none(
GatekeepCaptchasModel.guild_id == ctx.guild.id,
GatekeepCaptchasModel.user_id == ctx.author.id,
)
if gkc is None:
await ctx.send("You have no captcha to complete.", ephemeral=True)
return
if gkc.captcha != captcha:
await ctx.send("Your captcha was incorrect.", ephemeral=True)
return
gkc.delete_instance()
await ctx.author.add_role(int(gk.gatekeep_approve_role))
await ctx.send(
str(gk.gatekeep_approve_message).format(member=ctx.author, guild=ctx.guild),
ephemeral=False,
)
@listen(events.MemberAdd)
async def on_member_join(self, member_add: events.MemberAdd):
member = member_add.member
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=member.guild.id)
gs: GuildSettingsModel
gs, _ = GuildSettingsModel.get_or_create(guild_id=member.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=member.guild.id)
if gk.gatekeep_method != "captcha" or not gs.use_gatekeep:
return
gkc: GatekeepCaptchasModel
gkc, _ = GatekeepCaptchasModel.get_or_create(
guild_id=member.guild.id, user_id=member.id
)
image = ImageCaptcha(width=240, height=90)
audio = AudioCaptcha()
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
gkc.captcha = captcha_text
gkc.save()
bio_image = BytesIO()
bio_audio = BytesIO()
image.write(captcha_text, bio_image, format="png")
bio_audio.write(audio.generate(captcha_text))
bio_image.seek(0)
bio_audio.seek(0)
if (
jl.message_channel is None
or (channel := await member.guild.fetch_channel(jl.message_channel)) is None
):
await member.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
components=Button(
label="Regenerate",
emoji="🔃",
style=ButtonStyles.GRAY,
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
),
)
return
await channel.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
components=Button(
label="Regenerate",
emoji="🔃",
style=ButtonStyles.GRAY,
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
),
)
@listen(events.Button)
async def on_regenerate_button(self, button: events.Button):
ctx = button.context
member = ctx.author
if (
not ctx.custom_id.startswith("gatekeep-captcha-regenerate:")
or not len(ctx.custom_id.split(":")) == 2
):
return
if ctx.author.id != int(ctx.custom_id.split(":")[1]):
await ctx.send("You are not the owner of this captcha.", ephemeral=True)
return
gkc: GatekeepCaptchasModel
gkc = GatekeepCaptchasModel.get_or_none(
guild_id=ctx.guild.id, user_id=ctx.author.id
)
if gkc is None:
await ctx.send("You have no captcha to regenerate.", ephemeral=True)
return
image = ImageCaptcha(width=240, height=90)
audio = AudioCaptcha()
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
gkc.captcha = captcha_text
gkc.save()
bio_image = BytesIO()
bio_audio = BytesIO()
image.write(captcha_text, bio_image, format="png")
bio_audio.write(audio.generate(captcha_text))
bio_image.seek(0)
bio_audio.seek(0)
msg = await ctx.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
components=Button(
label="Regenerate",
emoji="🔃",
style=ButtonStyles.GRAY,
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
),
)
if msg is not None:
await ctx.message.delete()
def setup(client: Client):
GatekeepModel.create_table()
GatekeepCaptchasModel.create_table()
Gatekeep(client)
logging.info("Gatekeep extension loaded")