2022-08-04 03:52:53 +02:00
|
|
|
|
import logging
|
2022-10-15 03:35:42 +02:00
|
|
|
|
import random
|
|
|
|
|
import string
|
2022-08-04 03:52:53 +02:00
|
|
|
|
from io import BytesIO
|
|
|
|
|
from naff import (
|
|
|
|
|
Extension,
|
|
|
|
|
Client,
|
|
|
|
|
slash_command,
|
|
|
|
|
slash_option,
|
|
|
|
|
context_menu,
|
|
|
|
|
listen,
|
|
|
|
|
CommandTypes,
|
|
|
|
|
InteractionContext,
|
|
|
|
|
OptionTypes,
|
|
|
|
|
Role,
|
|
|
|
|
Member,
|
|
|
|
|
File,
|
|
|
|
|
SlashCommandChoice,
|
2022-08-04 14:34:45 +02:00
|
|
|
|
Button,
|
|
|
|
|
ButtonStyles,
|
2022-08-04 03:52:53 +02:00
|
|
|
|
)
|
|
|
|
|
from naff.api import events
|
|
|
|
|
from naff.models.discord.enums import Permissions
|
|
|
|
|
from captcha.image import ImageCaptcha
|
|
|
|
|
from captcha.audio import AudioCaptcha
|
|
|
|
|
|
|
|
|
|
from database import (
|
|
|
|
|
GuildSettings as GuildSettingsModel,
|
|
|
|
|
Gatekeep as GatekeepModel,
|
|
|
|
|
JoinLeave as JoinLeaveModel,
|
|
|
|
|
GatekeepCaptchas as GatekeepCaptchasModel,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Gatekeep(Extension):
|
|
|
|
|
def __init__(self, client: Client):
|
|
|
|
|
self.client = client
|
|
|
|
|
|
|
|
|
|
@slash_command(
|
|
|
|
|
name="gatekeep",
|
|
|
|
|
description="The gatekeep system",
|
|
|
|
|
sub_cmd_name="set-type",
|
|
|
|
|
sub_cmd_description="Set how Gatekeep allows users to join",
|
|
|
|
|
dm_permission=False,
|
|
|
|
|
default_member_permissions=Permissions.MANAGE_GUILD,
|
|
|
|
|
)
|
|
|
|
|
@slash_option(
|
|
|
|
|
name="type",
|
|
|
|
|
description="Type of Gatekeep",
|
|
|
|
|
required=True,
|
|
|
|
|
opt_type=OptionTypes.STRING,
|
|
|
|
|
choices=[
|
|
|
|
|
SlashCommandChoice(
|
|
|
|
|
name="Manual",
|
|
|
|
|
value="manual",
|
|
|
|
|
),
|
|
|
|
|
SlashCommandChoice(
|
|
|
|
|
name="Captcha",
|
|
|
|
|
value="captcha",
|
|
|
|
|
),
|
|
|
|
|
],
|
|
|
|
|
)
|
2022-10-15 03:35:42 +02:00
|
|
|
|
async def set_type_command(self, ctx: InteractionContext, type_: str):
|
2022-08-04 03:52:53 +02:00
|
|
|
|
gk: GatekeepModel
|
|
|
|
|
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
2022-10-15 03:35:42 +02:00
|
|
|
|
gk.gatekeep_method = type_
|
2022-08-04 03:52:53 +02:00
|
|
|
|
gk.save()
|
2022-10-15 03:35:42 +02:00
|
|
|
|
await ctx.send(f"Gatekeep type set to {type_}", ephemeral=True)
|
2022-08-04 03:52:53 +02:00
|
|
|
|
|
|
|
|
|
@slash_command(
|
|
|
|
|
name="gatekeep",
|
|
|
|
|
description="The gatekeep system",
|
|
|
|
|
sub_cmd_name="set-role",
|
|
|
|
|
sub_cmd_description="Set the role that users gain upon approval",
|
|
|
|
|
dm_permission=False,
|
|
|
|
|
default_member_permissions=Permissions.MANAGE_GUILD,
|
|
|
|
|
)
|
|
|
|
|
@slash_option(
|
|
|
|
|
name="role",
|
|
|
|
|
description="Role to grant",
|
|
|
|
|
required=True,
|
|
|
|
|
opt_type=OptionTypes.ROLE,
|
|
|
|
|
)
|
|
|
|
|
async def set_role_command(self, ctx: InteractionContext, role: Role):
|
|
|
|
|
gk: GatekeepModel
|
|
|
|
|
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
|
|
|
|
gk.gatekeep_approve_role = role.id
|
|
|
|
|
gk.save()
|
|
|
|
|
await ctx.send(f"Gatekeep role set to {role.name}", ephemeral=True)
|
|
|
|
|
|
|
|
|
|
@slash_command(
|
|
|
|
|
name="gatekeep",
|
|
|
|
|
description="The gatekeep system",
|
|
|
|
|
sub_cmd_name="set-message",
|
|
|
|
|
sub_cmd_description="Set the message that users see when they are approved",
|
|
|
|
|
dm_permission=False,
|
|
|
|
|
default_member_permissions=Permissions.MANAGE_GUILD,
|
|
|
|
|
)
|
|
|
|
|
@slash_option(
|
|
|
|
|
name="message",
|
|
|
|
|
description="Message to send",
|
|
|
|
|
required=True,
|
|
|
|
|
opt_type=OptionTypes.STRING,
|
|
|
|
|
)
|
|
|
|
|
async def set_message_command(self, ctx: InteractionContext, message: str):
|
|
|
|
|
gk: GatekeepModel
|
|
|
|
|
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
|
|
|
|
gk.gatekeep_approve_message = message
|
|
|
|
|
gk.save()
|
|
|
|
|
await ctx.send(f"Gatekeep message set to {message}", ephemeral=True)
|
|
|
|
|
|
|
|
|
|
@slash_command(
|
|
|
|
|
name="approve",
|
|
|
|
|
description="Approve a user",
|
|
|
|
|
dm_permission=False,
|
|
|
|
|
default_member_permissions=Permissions.MANAGE_ROLES,
|
|
|
|
|
)
|
|
|
|
|
@slash_option(
|
|
|
|
|
name="user",
|
|
|
|
|
description="User to approve",
|
|
|
|
|
required=True,
|
|
|
|
|
opt_type=OptionTypes.USER,
|
|
|
|
|
)
|
|
|
|
|
async def approve_command(self, ctx: InteractionContext, user: Member):
|
|
|
|
|
gk: GatekeepModel
|
|
|
|
|
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
|
|
|
|
jl: JoinLeaveModel
|
|
|
|
|
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
|
|
|
|
|
await user.add_role(int(gk.gatekeep_approve_role))
|
|
|
|
|
|
2022-08-11 01:39:15 +02:00
|
|
|
|
# Check if a welcome channel is set
|
2022-08-04 03:52:53 +02:00
|
|
|
|
welcome_channel = not jl.message_channel is None
|
|
|
|
|
|
2022-08-11 01:39:15 +02:00
|
|
|
|
# If there is no approval message set, inform the issuer privately.
|
2022-08-04 03:52:53 +02:00
|
|
|
|
if gk.gatekeep_approve_message is None:
|
|
|
|
|
await ctx.send(
|
|
|
|
|
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
|
|
|
|
|
ephemeral=True,
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
2022-08-11 01:39:15 +02:00
|
|
|
|
# If there is no welcome channel set, attempt to DM the approval message to the user.
|
2022-08-04 03:52:53 +02:00
|
|
|
|
if not welcome_channel:
|
|
|
|
|
await ctx.send(
|
|
|
|
|
f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}",
|
|
|
|
|
ephemeral=True,
|
|
|
|
|
)
|
|
|
|
|
await user.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
2022-08-11 01:39:15 +02:00
|
|
|
|
# DM the user if the bot fails to retrieve the welcome channel.
|
2022-08-04 03:52:53 +02:00
|
|
|
|
channel = await ctx.guild.fetch_channel(jl.message_channel)
|
|
|
|
|
if not channel:
|
|
|
|
|
await ctx.send(
|
|
|
|
|
f"{user.mention} has been approved.\nNB: No welcome channel has been set – attempting to DM {user.mention}",
|
|
|
|
|
ephemeral=True,
|
|
|
|
|
)
|
|
|
|
|
await user.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
2022-08-11 01:39:15 +02:00
|
|
|
|
# If none of the above occur, finally send the approval message to the welcome channel.
|
2022-08-04 03:52:53 +02:00
|
|
|
|
await channel.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
|
|
|
|
)
|
|
|
|
|
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
|
|
|
|
|
|
|
|
|
|
@context_menu(
|
2022-08-04 13:46:07 +02:00
|
|
|
|
name="Approve User",
|
2022-08-04 03:52:53 +02:00
|
|
|
|
dm_permission=False,
|
|
|
|
|
default_member_permissions=Permissions.MANAGE_ROLES,
|
|
|
|
|
context_type=CommandTypes.USER,
|
|
|
|
|
)
|
2022-08-04 13:46:07 +02:00
|
|
|
|
async def approve_context_menu(self, ctx: InteractionContext):
|
|
|
|
|
user: Member = ctx.target
|
2022-08-04 03:52:53 +02:00
|
|
|
|
gk: GatekeepModel
|
|
|
|
|
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
|
|
|
|
|
jl: JoinLeaveModel
|
|
|
|
|
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
|
2022-08-04 13:46:07 +02:00
|
|
|
|
await user.add_role(int(gk.gatekeep_approve_role))
|
2022-08-04 03:52:53 +02:00
|
|
|
|
|
|
|
|
|
welcome_channel = not jl.message_channel is None
|
|
|
|
|
|
|
|
|
|
if gk.gatekeep_approve_message is None:
|
|
|
|
|
await ctx.send(
|
|
|
|
|
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
|
|
|
|
|
ephemeral=True,
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not welcome_channel:
|
|
|
|
|
await ctx.send(
|
2022-10-15 03:35:42 +02:00
|
|
|
|
(f"{user.mention} has been approved.\n"
|
|
|
|
|
f"NB: No welcome channel has been set – attempting to DM {user.mention}"),
|
2022-08-04 03:52:53 +02:00
|
|
|
|
ephemeral=True,
|
|
|
|
|
)
|
|
|
|
|
await user.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
channel = await ctx.guild.fetch_channel(jl.message_channel)
|
|
|
|
|
if not channel:
|
|
|
|
|
await ctx.send(
|
2022-10-15 03:35:42 +02:00
|
|
|
|
(f"{user.mention} has been approved.\n"
|
|
|
|
|
f"NB: No welcome channel has been set – attempting to DM {user.mention}"),
|
2022-08-04 03:52:53 +02:00
|
|
|
|
ephemeral=True,
|
|
|
|
|
)
|
|
|
|
|
await user.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
await channel.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
|
|
|
|
|
)
|
2022-08-04 13:46:07 +02:00
|
|
|
|
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
|
2022-08-04 03:52:53 +02:00
|
|
|
|
|
2022-08-11 01:39:15 +02:00
|
|
|
|
# Allow the use of a reaction to approve a user.
|
|
|
|
|
# This is mainly for compatibility with NLL.
|
|
|
|
|
# The permission thingy should probably be reworked, as it currently allows anyone
|
|
|
|
|
# with the manage roles permission to use this.
|
|
|
|
|
# TODO: Rewrite this to require a specific role.
|
2022-08-04 04:15:28 +02:00
|
|
|
|
@listen(events.MessageReactionAdd)
|
|
|
|
|
async def on_reaction_add(self, reaction: events.MessageReactionAdd):
|
|
|
|
|
if not reaction.emoji.name in [
|
|
|
|
|
"✅",
|
|
|
|
|
"☑",
|
|
|
|
|
"✔",
|
|
|
|
|
] or not reaction.author.has_permission(Permissions.MANAGE_ROLES):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
gk: GatekeepModel
|
|
|
|
|
gk, _ = GatekeepModel.get_or_create(guild_id=reaction.guild.id)
|
|
|
|
|
jl: JoinLeaveModel
|
|
|
|
|
jl, _ = JoinLeaveModel.get_or_create(guild_id=reaction.guild.id)
|
|
|
|
|
|
|
|
|
|
if reaction.message.channel.id != jl.message_channel:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
await reaction.message.author.add_role(int(gk.gatekeep_approve_role))
|
|
|
|
|
await reaction.message.channel.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(
|
|
|
|
|
member=reaction.message.author, guild=reaction.guild
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
2022-08-04 03:52:53 +02:00
|
|
|
|
@slash_command(
|
|
|
|
|
name="captcha",
|
|
|
|
|
description="Complete a captcha",
|
|
|
|
|
dm_permission=False,
|
|
|
|
|
)
|
|
|
|
|
@slash_option(
|
|
|
|
|
name="captcha",
|
|
|
|
|
description="The captcha solution",
|
|
|
|
|
required=True,
|
|
|
|
|
opt_type=OptionTypes.STRING,
|
|
|
|
|
)
|
|
|
|
|
async def captcha_command(self, ctx: InteractionContext, captcha: str):
|
|
|
|
|
|
|
|
|
|
gk: GatekeepModel = GatekeepModel.get_or_none(
|
|
|
|
|
GatekeepModel.guild_id == ctx.guild.id,
|
|
|
|
|
)
|
|
|
|
|
if gk is None:
|
|
|
|
|
await ctx.send("No gatekeep set", ephemeral=True)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
gkc: GatekeepCaptchasModel = GatekeepCaptchasModel.get_or_none(
|
|
|
|
|
GatekeepCaptchasModel.guild_id == ctx.guild.id,
|
|
|
|
|
GatekeepCaptchasModel.user_id == ctx.author.id,
|
|
|
|
|
)
|
|
|
|
|
if gkc is None:
|
|
|
|
|
await ctx.send("You have no captcha to complete.", ephemeral=True)
|
|
|
|
|
return
|
|
|
|
|
if gkc.captcha != captcha:
|
|
|
|
|
await ctx.send("Your captcha was incorrect.", ephemeral=True)
|
|
|
|
|
return
|
|
|
|
|
|
2022-08-05 13:19:08 +02:00
|
|
|
|
gkc.delete_instance()
|
2022-08-04 03:52:53 +02:00
|
|
|
|
await ctx.author.add_role(int(gk.gatekeep_approve_role))
|
|
|
|
|
await ctx.send(
|
|
|
|
|
str(gk.gatekeep_approve_message).format(member=ctx.author, guild=ctx.guild),
|
|
|
|
|
ephemeral=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@listen(events.MemberAdd)
|
|
|
|
|
async def on_member_join(self, member_add: events.MemberAdd):
|
|
|
|
|
member = member_add.member
|
|
|
|
|
|
|
|
|
|
gk: GatekeepModel
|
|
|
|
|
gk, _ = GatekeepModel.get_or_create(guild_id=member.guild.id)
|
|
|
|
|
gs: GuildSettingsModel
|
|
|
|
|
gs, _ = GuildSettingsModel.get_or_create(guild_id=member.guild.id)
|
|
|
|
|
jl: JoinLeaveModel
|
|
|
|
|
jl, _ = JoinLeaveModel.get_or_create(guild_id=member.guild.id)
|
|
|
|
|
if gk.gatekeep_method != "captcha" or not gs.use_gatekeep:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
gkc: GatekeepCaptchasModel
|
|
|
|
|
gkc, _ = GatekeepCaptchasModel.get_or_create(
|
|
|
|
|
guild_id=member.guild.id, user_id=member.id
|
|
|
|
|
)
|
|
|
|
|
image = ImageCaptcha(width=240, height=90)
|
|
|
|
|
audio = AudioCaptcha()
|
|
|
|
|
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
|
|
|
|
|
gkc.captcha = captcha_text
|
|
|
|
|
gkc.save()
|
|
|
|
|
|
|
|
|
|
bio_image = BytesIO()
|
|
|
|
|
bio_audio = BytesIO()
|
|
|
|
|
image.write(captcha_text, bio_image, format="png")
|
|
|
|
|
bio_audio.write(audio.generate(captcha_text))
|
|
|
|
|
bio_image.seek(0)
|
|
|
|
|
bio_audio.seek(0)
|
|
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
jl.message_channel is None
|
|
|
|
|
or (channel := await member.guild.fetch_channel(jl.message_channel)) is None
|
|
|
|
|
):
|
|
|
|
|
await member.send(
|
|
|
|
|
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
|
|
|
|
|
"You can complete it by sending the command `/captcha <text>` in the server",
|
|
|
|
|
files=[
|
|
|
|
|
File(bio_image, file_name="captcha.png"),
|
|
|
|
|
File(bio_audio, file_name="captcha.wav"),
|
|
|
|
|
],
|
2022-08-04 14:34:45 +02:00
|
|
|
|
components=Button(
|
|
|
|
|
label="Regenerate",
|
|
|
|
|
emoji="🔃",
|
|
|
|
|
style=ButtonStyles.GRAY,
|
|
|
|
|
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
|
|
|
|
|
),
|
2022-08-04 03:52:53 +02:00
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
await channel.send(
|
|
|
|
|
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
|
|
|
|
|
"You can complete it by sending the command `/captcha <text>` in the server",
|
|
|
|
|
files=[
|
|
|
|
|
File(bio_image, file_name="captcha.png"),
|
|
|
|
|
File(bio_audio, file_name="captcha.wav"),
|
|
|
|
|
],
|
2022-08-04 14:34:45 +02:00
|
|
|
|
components=Button(
|
|
|
|
|
label="Regenerate",
|
|
|
|
|
emoji="🔃",
|
|
|
|
|
style=ButtonStyles.GRAY,
|
|
|
|
|
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
2023-02-01 19:16:35 +01:00
|
|
|
|
@listen(events.ButtonPressed)
|
|
|
|
|
async def on_regenerate_button(self, button: events.ButtonPressed):
|
|
|
|
|
ctx = button.ctx
|
2022-08-04 14:34:45 +02:00
|
|
|
|
member = ctx.author
|
|
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
not ctx.custom_id.startswith("gatekeep-captcha-regenerate:")
|
|
|
|
|
or not len(ctx.custom_id.split(":")) == 2
|
|
|
|
|
):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if ctx.author.id != int(ctx.custom_id.split(":")[1]):
|
|
|
|
|
await ctx.send("You are not the owner of this captcha.", ephemeral=True)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
gkc: GatekeepCaptchasModel
|
|
|
|
|
gkc = GatekeepCaptchasModel.get_or_none(
|
|
|
|
|
guild_id=ctx.guild.id, user_id=ctx.author.id
|
2022-08-04 03:52:53 +02:00
|
|
|
|
)
|
|
|
|
|
|
2022-08-04 14:34:45 +02:00
|
|
|
|
if gkc is None:
|
|
|
|
|
await ctx.send("You have no captcha to regenerate.", ephemeral=True)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image = ImageCaptcha(width=240, height=90)
|
|
|
|
|
audio = AudioCaptcha()
|
|
|
|
|
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
|
|
|
|
|
gkc.captcha = captcha_text
|
|
|
|
|
gkc.save()
|
|
|
|
|
|
|
|
|
|
bio_image = BytesIO()
|
|
|
|
|
bio_audio = BytesIO()
|
|
|
|
|
image.write(captcha_text, bio_image, format="png")
|
|
|
|
|
bio_audio.write(audio.generate(captcha_text))
|
|
|
|
|
bio_image.seek(0)
|
|
|
|
|
bio_audio.seek(0)
|
|
|
|
|
|
|
|
|
|
msg = await ctx.send(
|
|
|
|
|
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
|
|
|
|
|
"You can complete it by sending the command `/captcha <text>` in the server",
|
|
|
|
|
files=[
|
|
|
|
|
File(bio_image, file_name="captcha.png"),
|
|
|
|
|
File(bio_audio, file_name="captcha.wav"),
|
|
|
|
|
],
|
|
|
|
|
components=Button(
|
|
|
|
|
label="Regenerate",
|
|
|
|
|
emoji="🔃",
|
|
|
|
|
style=ButtonStyles.GRAY,
|
|
|
|
|
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if msg is not None:
|
|
|
|
|
await ctx.message.delete()
|
|
|
|
|
|
|
|
|
|
|
2022-08-04 03:52:53 +02:00
|
|
|
|
def setup(client: Client):
|
|
|
|
|
GatekeepModel.create_table()
|
|
|
|
|
GatekeepCaptchasModel.create_table()
|
|
|
|
|
Gatekeep(client)
|
|
|
|
|
logging.info("Gatekeep extension loaded")
|