Heimdallr/heimdallr/commands/gatekeep.py

416 lines
14 KiB
Python
Raw Normal View History

import logging
2022-10-15 03:35:42 +02:00
import random
import string
from io import BytesIO
from naff import (
Extension,
Client,
slash_command,
slash_option,
context_menu,
listen,
CommandTypes,
InteractionContext,
OptionTypes,
Role,
Member,
File,
SlashCommandChoice,
2022-08-04 14:34:45 +02:00
Button,
ButtonStyles,
)
from naff.api import events
from naff.models.discord.enums import Permissions
from captcha.image import ImageCaptcha
from captcha.audio import AudioCaptcha
from database import (
GuildSettings as GuildSettingsModel,
Gatekeep as GatekeepModel,
JoinLeave as JoinLeaveModel,
GatekeepCaptchas as GatekeepCaptchasModel,
)
class Gatekeep(Extension):
def __init__(self, client: Client):
self.client = client
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-type",
sub_cmd_description="Set how Gatekeep allows users to join",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="type",
description="Type of Gatekeep",
required=True,
opt_type=OptionTypes.STRING,
choices=[
SlashCommandChoice(
name="Manual",
value="manual",
),
SlashCommandChoice(
name="Captcha",
value="captcha",
),
],
)
2022-10-15 03:35:42 +02:00
async def set_type_command(self, ctx: InteractionContext, type_: str):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
2022-10-15 03:35:42 +02:00
gk.gatekeep_method = type_
gk.save()
2022-10-15 03:35:42 +02:00
await ctx.send(f"Gatekeep type set to {type_}", ephemeral=True)
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-role",
sub_cmd_description="Set the role that users gain upon approval",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="role",
description="Role to grant",
required=True,
opt_type=OptionTypes.ROLE,
)
async def set_role_command(self, ctx: InteractionContext, role: Role):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_approve_role = role.id
gk.save()
await ctx.send(f"Gatekeep role set to {role.name}", ephemeral=True)
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-message",
sub_cmd_description="Set the message that users see when they are approved",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="message",
description="Message to send",
required=True,
opt_type=OptionTypes.STRING,
)
async def set_message_command(self, ctx: InteractionContext, message: str):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_approve_message = message
gk.save()
await ctx.send(f"Gatekeep message set to {message}", ephemeral=True)
@slash_command(
name="approve",
description="Approve a user",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_ROLES,
)
@slash_option(
name="user",
description="User to approve",
required=True,
opt_type=OptionTypes.USER,
)
async def approve_command(self, ctx: InteractionContext, user: Member):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
await user.add_role(int(gk.gatekeep_approve_role))
2022-08-11 01:39:15 +02:00
# Check if a welcome channel is set
welcome_channel = not jl.message_channel is None
2022-08-11 01:39:15 +02:00
# If there is no approval message set, inform the issuer privately.
if gk.gatekeep_approve_message is None:
await ctx.send(
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
ephemeral=True,
)
return
2022-08-11 01:39:15 +02:00
# If there is no welcome channel set, attempt to DM the approval message to the user.
if not welcome_channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
2022-08-11 01:39:15 +02:00
# DM the user if the bot fails to retrieve the welcome channel.
channel = await ctx.guild.fetch_channel(jl.message_channel)
if not channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
2022-08-11 01:39:15 +02:00
# If none of the above occur, finally send the approval message to the welcome channel.
await channel.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
@context_menu(
name="Approve User",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_ROLES,
context_type=CommandTypes.USER,
)
async def approve_context_menu(self, ctx: InteractionContext):
user: Member = ctx.target
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
await user.add_role(int(gk.gatekeep_approve_role))
welcome_channel = not jl.message_channel is None
if gk.gatekeep_approve_message is None:
await ctx.send(
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
ephemeral=True,
)
return
if not welcome_channel:
await ctx.send(
2022-10-15 03:35:42 +02:00
(f"{user.mention} has been approved.\n"
f"NB: No welcome channel has been set attempting to DM {user.mention}"),
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
channel = await ctx.guild.fetch_channel(jl.message_channel)
if not channel:
await ctx.send(
2022-10-15 03:35:42 +02:00
(f"{user.mention} has been approved.\n"
f"NB: No welcome channel has been set attempting to DM {user.mention}"),
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
await channel.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
2022-08-11 01:39:15 +02:00
# Allow the use of a reaction to approve a user.
# This is mainly for compatibility with NLL.
# The permission thingy should probably be reworked, as it currently allows anyone
# with the manage roles permission to use this.
# TODO: Rewrite this to require a specific role.
@listen(events.MessageReactionAdd)
async def on_reaction_add(self, reaction: events.MessageReactionAdd):
if not reaction.emoji.name in [
"",
"",
"",
] or not reaction.author.has_permission(Permissions.MANAGE_ROLES):
return
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=reaction.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=reaction.guild.id)
if reaction.message.channel.id != jl.message_channel:
return
await reaction.message.author.add_role(int(gk.gatekeep_approve_role))
await reaction.message.channel.send(
str(gk.gatekeep_approve_message).format(
member=reaction.message.author, guild=reaction.guild
)
)
@slash_command(
name="captcha",
description="Complete a captcha",
dm_permission=False,
)
@slash_option(
name="captcha",
description="The captcha solution",
required=True,
opt_type=OptionTypes.STRING,
)
async def captcha_command(self, ctx: InteractionContext, captcha: str):
gk: GatekeepModel = GatekeepModel.get_or_none(
GatekeepModel.guild_id == ctx.guild.id,
)
if gk is None:
await ctx.send("No gatekeep set", ephemeral=True)
return
gkc: GatekeepCaptchasModel = GatekeepCaptchasModel.get_or_none(
GatekeepCaptchasModel.guild_id == ctx.guild.id,
GatekeepCaptchasModel.user_id == ctx.author.id,
)
if gkc is None:
await ctx.send("You have no captcha to complete.", ephemeral=True)
return
if gkc.captcha != captcha:
await ctx.send("Your captcha was incorrect.", ephemeral=True)
return
2022-08-05 13:19:08 +02:00
gkc.delete_instance()
await ctx.author.add_role(int(gk.gatekeep_approve_role))
await ctx.send(
str(gk.gatekeep_approve_message).format(member=ctx.author, guild=ctx.guild),
ephemeral=False,
)
@listen(events.MemberAdd)
async def on_member_join(self, member_add: events.MemberAdd):
member = member_add.member
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=member.guild.id)
gs: GuildSettingsModel
gs, _ = GuildSettingsModel.get_or_create(guild_id=member.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=member.guild.id)
if gk.gatekeep_method != "captcha" or not gs.use_gatekeep:
return
gkc: GatekeepCaptchasModel
gkc, _ = GatekeepCaptchasModel.get_or_create(
guild_id=member.guild.id, user_id=member.id
)
image = ImageCaptcha(width=240, height=90)
audio = AudioCaptcha()
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
gkc.captcha = captcha_text
gkc.save()
bio_image = BytesIO()
bio_audio = BytesIO()
image.write(captcha_text, bio_image, format="png")
bio_audio.write(audio.generate(captcha_text))
bio_image.seek(0)
bio_audio.seek(0)
if (
jl.message_channel is None
or (channel := await member.guild.fetch_channel(jl.message_channel)) is None
):
await member.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
2022-08-04 14:34:45 +02:00
components=Button(
label="Regenerate",
emoji="🔃",
style=ButtonStyles.GRAY,
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
),
)
return
await channel.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
2022-08-04 14:34:45 +02:00
components=Button(
label="Regenerate",
emoji="🔃",
style=ButtonStyles.GRAY,
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
),
)
2023-02-01 19:16:35 +01:00
@listen(events.ButtonPressed)
async def on_regenerate_button(self, button: events.ButtonPressed):
ctx = button.ctx
2022-08-04 14:34:45 +02:00
member = ctx.author
if (
not ctx.custom_id.startswith("gatekeep-captcha-regenerate:")
or not len(ctx.custom_id.split(":")) == 2
):
return
if ctx.author.id != int(ctx.custom_id.split(":")[1]):
await ctx.send("You are not the owner of this captcha.", ephemeral=True)
return
gkc: GatekeepCaptchasModel
gkc = GatekeepCaptchasModel.get_or_none(
guild_id=ctx.guild.id, user_id=ctx.author.id
)
2022-08-04 14:34:45 +02:00
if gkc is None:
await ctx.send("You have no captcha to regenerate.", ephemeral=True)
return
image = ImageCaptcha(width=240, height=90)
audio = AudioCaptcha()
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
gkc.captcha = captcha_text
gkc.save()
bio_image = BytesIO()
bio_audio = BytesIO()
image.write(captcha_text, bio_image, format="png")
bio_audio.write(audio.generate(captcha_text))
bio_image.seek(0)
bio_audio.seek(0)
msg = await ctx.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
components=Button(
label="Regenerate",
emoji="🔃",
style=ButtonStyles.GRAY,
custom_id=f"gatekeep-captcha-regenerate:{member.id}",
),
)
if msg is not None:
await ctx.message.delete()
def setup(client: Client):
GatekeepModel.create_table()
GatekeepCaptchasModel.create_table()
Gatekeep(client)
logging.info("Gatekeep extension loaded")