import logging import random import string from io import BytesIO from naff import ( Extension, Client, slash_command, slash_option, context_menu, listen, CommandTypes, InteractionContext, OptionTypes, Role, Member, File, SlashCommandChoice, Button, ButtonStyles, ) from naff.api import events from naff.models.discord.enums import Permissions from captcha.image import ImageCaptcha from captcha.audio import AudioCaptcha from database import ( GuildSettings as GuildSettingsModel, Gatekeep as GatekeepModel, JoinLeave as JoinLeaveModel, GatekeepCaptchas as GatekeepCaptchasModel, ) class Gatekeep(Extension): def __init__(self, client: Client): self.client = client @slash_command( name="gatekeep", description="The gatekeep system", sub_cmd_name="set-type", sub_cmd_description="Set how Gatekeep allows users to join", dm_permission=False, default_member_permissions=Permissions.MANAGE_GUILD, ) @slash_option( name="type", description="Type of Gatekeep", required=True, opt_type=OptionTypes.STRING, choices=[ SlashCommandChoice( name="Manual", value="manual", ), SlashCommandChoice( name="Captcha", value="captcha", ), ], ) async def set_type_command(self, ctx: InteractionContext, type_: str): gk: GatekeepModel gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id) gk.gatekeep_method = type_ gk.save() await ctx.send(f"Gatekeep type set to {type_}", ephemeral=True) @slash_command( name="gatekeep", description="The gatekeep system", sub_cmd_name="set-role", sub_cmd_description="Set the role that users gain upon approval", dm_permission=False, default_member_permissions=Permissions.MANAGE_GUILD, ) @slash_option( name="role", description="Role to grant", required=True, opt_type=OptionTypes.ROLE, ) async def set_role_command(self, ctx: InteractionContext, role: Role): gk: GatekeepModel gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id) gk.gatekeep_approve_role = role.id gk.save() await ctx.send(f"Gatekeep role set to {role.name}", ephemeral=True) @slash_command( name="gatekeep", description="The gatekeep system", sub_cmd_name="set-message", sub_cmd_description="Set the message that users see when they are approved", dm_permission=False, default_member_permissions=Permissions.MANAGE_GUILD, ) @slash_option( name="message", description="Message to send", required=True, opt_type=OptionTypes.STRING, ) async def set_message_command(self, ctx: InteractionContext, message: str): gk: GatekeepModel gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id) gk.gatekeep_approve_message = message gk.save() await ctx.send(f"Gatekeep message set to {message}", ephemeral=True) @slash_command( name="approve", description="Approve a user", dm_permission=False, default_member_permissions=Permissions.MANAGE_ROLES, ) @slash_option( name="user", description="User to approve", required=True, opt_type=OptionTypes.USER, ) async def approve_command(self, ctx: InteractionContext, user: Member): gk: GatekeepModel gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id) jl: JoinLeaveModel jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id) await user.add_role(int(gk.gatekeep_approve_role)) # Check if a welcome channel is set welcome_channel = not jl.message_channel is None # If there is no approval message set, inform the issuer privately. if gk.gatekeep_approve_message is None: await ctx.send( f"{user.mention} has been approved.\nNB: No approval message has been sent.", ephemeral=True, ) return # If there is no welcome channel set, attempt to DM the approval message to the user. if not welcome_channel: await ctx.send( f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}", ephemeral=True, ) await user.send( str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild) ) return # DM the user if the bot fails to retrieve the welcome channel. channel = await ctx.guild.fetch_channel(jl.message_channel) if not channel: await ctx.send( f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}", ephemeral=True, ) await user.send( str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild) ) return # If none of the above occur, finally send the approval message to the welcome channel. await channel.send( str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild) ) await ctx.send(f"{user.mention} has been approved.", ephemeral=True) @context_menu( name="Approve User", dm_permission=False, default_member_permissions=Permissions.MANAGE_ROLES, context_type=CommandTypes.USER, ) async def approve_context_menu(self, ctx: InteractionContext): user: Member = ctx.target gk: GatekeepModel gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id) jl: JoinLeaveModel jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id) await user.add_role(int(gk.gatekeep_approve_role)) welcome_channel = not jl.message_channel is None if gk.gatekeep_approve_message is None: await ctx.send( f"{user.mention} has been approved.\nNB: No approval message has been sent.", ephemeral=True, ) return if not welcome_channel: await ctx.send( (f"{user.mention} has been approved.\n" f"NB: No welcome channel has been set – attempting to DM {user.mention}"), ephemeral=True, ) await user.send( str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild) ) return channel = await ctx.guild.fetch_channel(jl.message_channel) if not channel: await ctx.send( (f"{user.mention} has been approved.\n" f"NB: No welcome channel has been set – attempting to DM {user.mention}"), ephemeral=True, ) await user.send( str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild) ) return await channel.send( str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild) ) await ctx.send(f"{user.mention} has been approved.", ephemeral=True) # Allow the use of a reaction to approve a user. # This is mainly for compatibility with NLL. # The permission thingy should probably be reworked, as it currently allows anyone # with the manage roles permission to use this. # TODO: Rewrite this to require a specific role. @listen(events.MessageReactionAdd) async def on_reaction_add(self, reaction: events.MessageReactionAdd): if not reaction.emoji.name in [ "✅", "☑", "✔", ] or not reaction.author.has_permission(Permissions.MANAGE_ROLES): return gk: GatekeepModel gk, _ = GatekeepModel.get_or_create(guild_id=reaction.guild.id) jl: JoinLeaveModel jl, _ = JoinLeaveModel.get_or_create(guild_id=reaction.guild.id) if reaction.message.channel.id != jl.message_channel: return await reaction.message.author.add_role(int(gk.gatekeep_approve_role)) await reaction.message.channel.send( str(gk.gatekeep_approve_message).format( member=reaction.message.author, guild=reaction.guild ) ) @slash_command( name="captcha", description="Complete a captcha", dm_permission=False, ) @slash_option( name="captcha", description="The captcha solution", required=True, opt_type=OptionTypes.STRING, ) async def captcha_command(self, ctx: InteractionContext, captcha: str): gk: GatekeepModel = GatekeepModel.get_or_none( GatekeepModel.guild_id == ctx.guild.id, ) if gk is None: await ctx.send("No gatekeep set", ephemeral=True) return gkc: GatekeepCaptchasModel = GatekeepCaptchasModel.get_or_none( GatekeepCaptchasModel.guild_id == ctx.guild.id, GatekeepCaptchasModel.user_id == ctx.author.id, ) if gkc is None: await ctx.send("You have no captcha to complete.", ephemeral=True) return if gkc.captcha != captcha: await ctx.send("Your captcha was incorrect.", ephemeral=True) return gkc.delete_instance() await ctx.author.add_role(int(gk.gatekeep_approve_role)) await ctx.send( str(gk.gatekeep_approve_message).format(member=ctx.author, guild=ctx.guild), ephemeral=False, ) @listen(events.MemberAdd) async def on_member_join(self, member_add: events.MemberAdd): member = member_add.member gk: GatekeepModel gk, _ = GatekeepModel.get_or_create(guild_id=member.guild.id) gs: GuildSettingsModel gs, _ = GuildSettingsModel.get_or_create(guild_id=member.guild.id) jl: JoinLeaveModel jl, _ = JoinLeaveModel.get_or_create(guild_id=member.guild.id) if gk.gatekeep_method != "captcha" or not gs.use_gatekeep: return gkc: GatekeepCaptchasModel gkc, _ = GatekeepCaptchasModel.get_or_create( guild_id=member.guild.id, user_id=member.id ) image = ImageCaptcha(width=240, height=90) audio = AudioCaptcha() captcha_text = "".join([random.choice(string.digits) for _ in range(7)]) gkc.captcha = captcha_text gkc.save() bio_image = BytesIO() bio_audio = BytesIO() image.write(captcha_text, bio_image, format="png") bio_audio.write(audio.generate(captcha_text)) bio_image.seek(0) bio_audio.seek(0) if ( jl.message_channel is None or (channel := await member.guild.fetch_channel(jl.message_channel)) is None ): await member.send( f"The server *{member.guild.name}* requires users to complete a captcha to join.\n" "You can complete it by sending the command `/captcha ` in the server", files=[ File(bio_image, file_name="captcha.png"), File(bio_audio, file_name="captcha.wav"), ], components=Button( label="Regenerate", emoji="🔃", style=ButtonStyles.GRAY, custom_id=f"gatekeep-captcha-regenerate:{member.id}", ), ) return await channel.send( f"The server *{member.guild.name}* requires users to complete a captcha to join.\n" "You can complete it by sending the command `/captcha ` in the server", files=[ File(bio_image, file_name="captcha.png"), File(bio_audio, file_name="captcha.wav"), ], components=Button( label="Regenerate", emoji="🔃", style=ButtonStyles.GRAY, custom_id=f"gatekeep-captcha-regenerate:{member.id}", ), ) @listen(events.Button) async def on_regenerate_button(self, button: events.Button): ctx = button.context member = ctx.author if ( not ctx.custom_id.startswith("gatekeep-captcha-regenerate:") or not len(ctx.custom_id.split(":")) == 2 ): return if ctx.author.id != int(ctx.custom_id.split(":")[1]): await ctx.send("You are not the owner of this captcha.", ephemeral=True) return gkc: GatekeepCaptchasModel gkc = GatekeepCaptchasModel.get_or_none( guild_id=ctx.guild.id, user_id=ctx.author.id ) if gkc is None: await ctx.send("You have no captcha to regenerate.", ephemeral=True) return image = ImageCaptcha(width=240, height=90) audio = AudioCaptcha() captcha_text = "".join([random.choice(string.digits) for _ in range(7)]) gkc.captcha = captcha_text gkc.save() bio_image = BytesIO() bio_audio = BytesIO() image.write(captcha_text, bio_image, format="png") bio_audio.write(audio.generate(captcha_text)) bio_image.seek(0) bio_audio.seek(0) msg = await ctx.send( f"The server *{member.guild.name}* requires users to complete a captcha to join.\n" "You can complete it by sending the command `/captcha ` in the server", files=[ File(bio_image, file_name="captcha.png"), File(bio_audio, file_name="captcha.wav"), ], components=Button( label="Regenerate", emoji="🔃", style=ButtonStyles.GRAY, custom_id=f"gatekeep-captcha-regenerate:{member.id}", ), ) if msg is not None: await ctx.message.delete() def setup(client: Client): GatekeepModel.create_table() GatekeepCaptchasModel.create_table() Gatekeep(client) logging.info("Gatekeep extension loaded")