331 lines
11 KiB
Python
331 lines
11 KiB
Python
import logging
|
||
import random, string
|
||
from io import BytesIO
|
||
from naff import (
|
||
Extension,
|
||
Client,
|
||
slash_command,
|
||
slash_option,
|
||
context_menu,
|
||
listen,
|
||
CommandTypes,
|
||
InteractionContext,
|
||
OptionTypes,
|
||
Role,
|
||
Member,
|
||
File,
|
||
SlashCommandChoice,
|
||
)
|
||
from naff.api import events
|
||
from naff.models.discord.enums import Permissions
|
||
from captcha.image import ImageCaptcha
|
||
from captcha.audio import AudioCaptcha
|
||
|
||
from database import (
|
||
GuildSettings as GuildSettingsModel,
|
||
Gatekeep as GatekeepModel,
|
||
JoinLeave as JoinLeaveModel,
|
||
GatekeepCaptchas as GatekeepCaptchasModel,
|
||
)
|
||
|
||
|
||
class Gatekeep(Extension):
|
||
def __init__(self, client: Client):
|
||
self.client = client
|
||
|
||
@slash_command(
|
||
name="gatekeep",
|
||
description="The gatekeep system",
|
||
sub_cmd_name="set-type",
|
||
sub_cmd_description="Set how Gatekeep allows users to join",
|
||
dm_permission=False,
|
||
default_member_permissions=Permissions.MANAGE_GUILD,
|
||
)
|
||
@slash_option(
|
||
name="type",
|
||
description="Type of Gatekeep",
|
||
required=True,
|
||
opt_type=OptionTypes.STRING,
|
||
choices=[
|
||
SlashCommandChoice(
|
||
name="Manual",
|
||
value="manual",
|
||
),
|
||
SlashCommandChoice(
|
||
name="Captcha",
|
||
value="captcha",
|
||
),
|
||
],
|
||
)
|
||
async def set_type_command(self, ctx: InteractionContext, type: str):
|
||
gk: GatekeepModel
|
||
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
||
gk.gatekeep_method = type
|
||
gk.save()
|
||
await ctx.send(f"Gatekeep type set to {type}", ephemeral=True)
|
||
|
||
@slash_command(
|
||
name="gatekeep",
|
||
description="The gatekeep system",
|
||
sub_cmd_name="set-role",
|
||
sub_cmd_description="Set the role that users gain upon approval",
|
||
dm_permission=False,
|
||
default_member_permissions=Permissions.MANAGE_GUILD,
|
||
)
|
||
@slash_option(
|
||
name="role",
|
||
description="Role to grant",
|
||
required=True,
|
||
opt_type=OptionTypes.ROLE,
|
||
)
|
||
async def set_role_command(self, ctx: InteractionContext, role: Role):
|
||
gk: GatekeepModel
|
||
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
||
gk.gatekeep_approve_role = role.id
|
||
gk.save()
|
||
await ctx.send(f"Gatekeep role set to {role.name}", ephemeral=True)
|
||
|
||
@slash_command(
|
||
name="gatekeep",
|
||
description="The gatekeep system",
|
||
sub_cmd_name="set-message",
|
||
sub_cmd_description="Set the message that users see when they are approved",
|
||
dm_permission=False,
|
||
default_member_permissions=Permissions.MANAGE_GUILD,
|
||
)
|
||
@slash_option(
|
||
name="message",
|
||
description="Message to send",
|
||
required=True,
|
||
opt_type=OptionTypes.STRING,
|
||
)
|
||
async def set_message_command(self, ctx: InteractionContext, message: str):
|
||
gk: GatekeepModel
|
||
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
||
gk.gatekeep_approve_message = message
|
||
gk.save()
|
||
await ctx.send(f"Gatekeep message set to {message}", ephemeral=True)
|
||
|
||
@slash_command(
|
||
name="approve",
|
||
description="Approve a user",
|
||
dm_permission=False,
|
||
default_member_permissions=Permissions.MANAGE_ROLES,
|
||
)
|
||
@slash_option(
|
||
name="user",
|
||
description="User to approve",
|
||
required=True,
|
||
opt_type=OptionTypes.USER,
|
||
)
|
||
async def approve_command(self, ctx: InteractionContext, user: Member):
|
||
gk: GatekeepModel
|
||
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
||
jl: JoinLeaveModel
|
||
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
|
||
await user.add_role(int(gk.gatekeep_approve_role))
|
||
|
||
welcome_channel = not jl.message_channel is None
|
||
|
||
if gk.gatekeep_approve_message is None:
|
||
await ctx.send(
|
||
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
|
||
ephemeral=True,
|
||
)
|
||
return
|
||
|
||
if not welcome_channel:
|
||
await ctx.send(
|
||
f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}",
|
||
ephemeral=True,
|
||
)
|
||
await user.send(
|
||
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
||
)
|
||
return
|
||
|
||
channel = await ctx.guild.fetch_channel(jl.message_channel)
|
||
if not channel:
|
||
await ctx.send(
|
||
f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}",
|
||
ephemeral=True,
|
||
)
|
||
await user.send(
|
||
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
||
)
|
||
return
|
||
|
||
await channel.send(
|
||
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
||
)
|
||
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
|
||
|
||
@context_menu(
|
||
name="approve",
|
||
dm_permission=False,
|
||
default_member_permissions=Permissions.MANAGE_ROLES,
|
||
context_type=CommandTypes.USER,
|
||
)
|
||
async def approve_context_menu(self, ctx: InteractionContext, user: Member):
|
||
gk: GatekeepModel
|
||
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
||
jl: JoinLeaveModel
|
||
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
|
||
user.add_role(int(gk.gatekeep_approve_role))
|
||
|
||
welcome_channel = not jl.message_channel is None
|
||
|
||
if gk.gatekeep_approve_message is None:
|
||
await ctx.send(
|
||
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
|
||
ephemeral=True,
|
||
)
|
||
return
|
||
|
||
if not welcome_channel:
|
||
await ctx.send(
|
||
f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}",
|
||
ephemeral=True,
|
||
)
|
||
await user.send(
|
||
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
||
)
|
||
return
|
||
|
||
channel = await ctx.guild.fetch_channel(jl.message_channel)
|
||
if not channel:
|
||
await ctx.send(
|
||
f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}",
|
||
ephemeral=True,
|
||
)
|
||
await user.send(
|
||
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
||
)
|
||
return
|
||
|
||
await channel.send(
|
||
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
||
)
|
||
|
||
@listen(events.MessageReactionAdd)
|
||
async def on_reaction_add(self, reaction: events.MessageReactionAdd):
|
||
if not reaction.emoji.name in [
|
||
"✅",
|
||
"☑",
|
||
"✔",
|
||
] or not reaction.author.has_permission(Permissions.MANAGE_ROLES):
|
||
return
|
||
|
||
gk: GatekeepModel
|
||
gk, _ = GatekeepModel.get_or_create(guild_id=reaction.guild.id)
|
||
jl: JoinLeaveModel
|
||
jl, _ = JoinLeaveModel.get_or_create(guild_id=reaction.guild.id)
|
||
|
||
if reaction.message.channel.id != jl.message_channel:
|
||
return
|
||
|
||
await reaction.message.author.add_role(int(gk.gatekeep_approve_role))
|
||
await reaction.message.channel.send(
|
||
str(gk.gatekeep_approve_message).format(
|
||
member=reaction.message.author, guild=reaction.guild
|
||
)
|
||
)
|
||
|
||
@slash_command(
|
||
name="captcha",
|
||
description="Complete a captcha",
|
||
dm_permission=False,
|
||
)
|
||
@slash_option(
|
||
name="captcha",
|
||
description="The captcha solution",
|
||
required=True,
|
||
opt_type=OptionTypes.STRING,
|
||
)
|
||
async def captcha_command(self, ctx: InteractionContext, captcha: str):
|
||
|
||
gk: GatekeepModel = GatekeepModel.get_or_none(
|
||
GatekeepModel.guild_id == ctx.guild.id,
|
||
)
|
||
if gk is None:
|
||
await ctx.send("No gatekeep set", ephemeral=True)
|
||
return
|
||
|
||
gkc: GatekeepCaptchasModel = GatekeepCaptchasModel.get_or_none(
|
||
GatekeepCaptchasModel.guild_id == ctx.guild.id,
|
||
GatekeepCaptchasModel.user_id == ctx.author.id,
|
||
)
|
||
if gkc is None:
|
||
await ctx.send("You have no captcha to complete.", ephemeral=True)
|
||
return
|
||
if gkc.captcha != captcha:
|
||
await ctx.send("Your captcha was incorrect.", ephemeral=True)
|
||
return
|
||
|
||
gkc.delete()
|
||
await ctx.author.add_role(int(gk.gatekeep_approve_role))
|
||
await ctx.send(
|
||
str(gk.gatekeep_approve_message).format(member=ctx.author, guild=ctx.guild),
|
||
ephemeral=False,
|
||
)
|
||
|
||
@listen(events.MemberAdd)
|
||
async def on_member_join(self, member_add: events.MemberAdd):
|
||
member = member_add.member
|
||
|
||
gk: GatekeepModel
|
||
gk, _ = GatekeepModel.get_or_create(guild_id=member.guild.id)
|
||
gs: GuildSettingsModel
|
||
gs, _ = GuildSettingsModel.get_or_create(guild_id=member.guild.id)
|
||
jl: JoinLeaveModel
|
||
jl, _ = JoinLeaveModel.get_or_create(guild_id=member.guild.id)
|
||
if gk.gatekeep_method != "captcha" or not gs.use_gatekeep:
|
||
return
|
||
|
||
gkc: GatekeepCaptchasModel
|
||
gkc, _ = GatekeepCaptchasModel.get_or_create(
|
||
guild_id=member.guild.id, user_id=member.id
|
||
)
|
||
image = ImageCaptcha(width=240, height=90)
|
||
audio = AudioCaptcha()
|
||
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
|
||
gkc.captcha = captcha_text
|
||
gkc.save()
|
||
|
||
bio_image = BytesIO()
|
||
bio_audio = BytesIO()
|
||
image.write(captcha_text, bio_image, format="png")
|
||
bio_audio.write(audio.generate(captcha_text))
|
||
bio_image.seek(0)
|
||
bio_audio.seek(0)
|
||
|
||
if (
|
||
jl.message_channel is None
|
||
or (channel := await member.guild.fetch_channel(jl.message_channel)) is None
|
||
):
|
||
await member.send(
|
||
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
|
||
"You can complete it by sending the command `/captcha <text>` in the server",
|
||
files=[
|
||
File(bio_image, file_name="captcha.png"),
|
||
File(bio_audio, file_name="captcha.wav"),
|
||
],
|
||
)
|
||
return
|
||
|
||
await channel.send(
|
||
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
|
||
"You can complete it by sending the command `/captcha <text>` in the server",
|
||
files=[
|
||
File(bio_image, file_name="captcha.png"),
|
||
File(bio_audio, file_name="captcha.wav"),
|
||
],
|
||
)
|
||
|
||
|
||
def setup(client: Client):
|
||
GatekeepModel.create_table()
|
||
GatekeepCaptchasModel.create_table()
|
||
Gatekeep(client)
|
||
logging.info("Gatekeep extension loaded")
|