Added Gatekeep features.

Added Gatekeep, a system where users require approval to fully join the server.
Currently, "manual" and "captcha" modes are available.
Manual mode requires someone to use the "/approve" command, or the "approve" context menu on a user to approve them.
Captcha mode allows the former, but lets the user complete a captcha in order to join.
The captcha is provided as both an image and as text.
This commit is contained in:
Vegard Berg 2022-08-04 03:52:53 +02:00
parent 40b347c875
commit 5cf3cdbb08
5 changed files with 355 additions and 1 deletions

View File

@ -129,6 +129,7 @@ if __name__ == "__main__":
load_dotenv()
bot.load_extension("commands.admin")
bot.load_extension("commands.gatekeep")
bot.load_extension("commands.quote")
bot.load_extension("commands.infractions")
bot.load_extension("commands.self_roles")

306
commands/gatekeep.py Normal file
View File

@ -0,0 +1,306 @@
import logging
import random, string
from io import BytesIO
from naff import (
Extension,
Client,
slash_command,
slash_option,
context_menu,
listen,
CommandTypes,
InteractionContext,
OptionTypes,
Role,
Member,
File,
SlashCommandChoice,
)
from naff.api import events
from naff.models.discord.enums import Permissions
from captcha.image import ImageCaptcha
from captcha.audio import AudioCaptcha
from database import (
GuildSettings as GuildSettingsModel,
Gatekeep as GatekeepModel,
JoinLeave as JoinLeaveModel,
GatekeepCaptchas as GatekeepCaptchasModel,
)
class Gatekeep(Extension):
def __init__(self, client: Client):
self.client = client
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-type",
sub_cmd_description="Set how Gatekeep allows users to join",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="type",
description="Type of Gatekeep",
required=True,
opt_type=OptionTypes.STRING,
choices=[
SlashCommandChoice(
name="Manual",
value="manual",
),
SlashCommandChoice(
name="Captcha",
value="captcha",
),
],
)
async def set_type_command(self, ctx: InteractionContext, type: str):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_method = type
gk.save()
await ctx.send(f"Gatekeep type set to {type}", ephemeral=True)
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-role",
sub_cmd_description="Set the role that users gain upon approval",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="role",
description="Role to grant",
required=True,
opt_type=OptionTypes.ROLE,
)
async def set_role_command(self, ctx: InteractionContext, role: Role):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_approve_role = role.id
gk.save()
await ctx.send(f"Gatekeep role set to {role.name}", ephemeral=True)
@slash_command(
name="gatekeep",
description="The gatekeep system",
sub_cmd_name="set-message",
sub_cmd_description="Set the message that users see when they are approved",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_GUILD,
)
@slash_option(
name="message",
description="Message to send",
required=True,
opt_type=OptionTypes.STRING,
)
async def set_message_command(self, ctx: InteractionContext, message: str):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
gk.gatekeep_approve_message = message
gk.save()
await ctx.send(f"Gatekeep message set to {message}", ephemeral=True)
@slash_command(
name="approve",
description="Approve a user",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_ROLES,
)
@slash_option(
name="user",
description="User to approve",
required=True,
opt_type=OptionTypes.USER,
)
async def approve_command(self, ctx: InteractionContext, user: Member):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
await user.add_role(int(gk.gatekeep_approve_role))
welcome_channel = not jl.message_channel is None
if gk.gatekeep_approve_message is None:
await ctx.send(
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
ephemeral=True,
)
return
if not welcome_channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
channel = await ctx.guild.fetch_channel(jl.message_channel)
if not channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
await channel.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
await ctx.send(f"{user.mention} has been approved.", ephemeral=True)
@context_menu(
name="approve",
dm_permission=False,
default_member_permissions=Permissions.MANAGE_ROLES,
context_type=CommandTypes.USER,
)
async def approve_context_menu(self, ctx: InteractionContext, user: Member):
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=ctx.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=ctx.guild.id)
user.add_role(int(gk.gatekeep_approve_role))
welcome_channel = not jl.message_channel is None
if gk.gatekeep_approve_message is None:
await ctx.send(
f"{user.mention} has been approved.\nNB: No approval message has been sent.",
ephemeral=True,
)
return
if not welcome_channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
channel = await ctx.guild.fetch_channel(jl.message_channel)
if not channel:
await ctx.send(
f"{user.mention} has been approved.\nNB: No welcome channel has been set attempting to DM {user.mention}",
ephemeral=True,
)
await user.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
return
await channel.send(
str(gk.gatekeep_approve_message).format(member=user, guild=ctx.guild)
)
@slash_command(
name="captcha",
description="Complete a captcha",
dm_permission=False,
)
@slash_option(
name="captcha",
description="The captcha solution",
required=True,
opt_type=OptionTypes.STRING,
)
async def captcha_command(self, ctx: InteractionContext, captcha: str):
gk: GatekeepModel = GatekeepModel.get_or_none(
GatekeepModel.guild_id == ctx.guild.id,
)
if gk is None:
await ctx.send("No gatekeep set", ephemeral=True)
return
gkc: GatekeepCaptchasModel = GatekeepCaptchasModel.get_or_none(
GatekeepCaptchasModel.guild_id == ctx.guild.id,
GatekeepCaptchasModel.user_id == ctx.author.id,
)
if gkc is None:
await ctx.send("You have no captcha to complete.", ephemeral=True)
return
if gkc.captcha != captcha:
await ctx.send("Your captcha was incorrect.", ephemeral=True)
return
gkc.delete()
await ctx.author.add_role(int(gk.gatekeep_approve_role))
await ctx.send(
str(gk.gatekeep_approve_message).format(member=ctx.author, guild=ctx.guild),
ephemeral=False,
)
@listen(events.MemberAdd)
async def on_member_join(self, member_add: events.MemberAdd):
member = member_add.member
gk: GatekeepModel
gk, _ = GatekeepModel.get_or_create(guild_id=member.guild.id)
gs: GuildSettingsModel
gs, _ = GuildSettingsModel.get_or_create(guild_id=member.guild.id)
jl: JoinLeaveModel
jl, _ = JoinLeaveModel.get_or_create(guild_id=member.guild.id)
if gk.gatekeep_method != "captcha" or not gs.use_gatekeep:
return
gkc: GatekeepCaptchasModel
gkc, _ = GatekeepCaptchasModel.get_or_create(
guild_id=member.guild.id, user_id=member.id
)
image = ImageCaptcha(width=240, height=90)
audio = AudioCaptcha()
captcha_text = "".join([random.choice(string.digits) for _ in range(7)])
gkc.captcha = captcha_text
gkc.save()
bio_image = BytesIO()
bio_audio = BytesIO()
image.write(captcha_text, bio_image, format="png")
bio_audio.write(audio.generate(captcha_text))
bio_image.seek(0)
bio_audio.seek(0)
if (
jl.message_channel is None
or (channel := await member.guild.fetch_channel(jl.message_channel)) is None
):
await member.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
)
return
await channel.send(
f"The server *{member.guild.name}* requires users to complete a captcha to join.\n"
"You can complete it by sending the command `/captcha <text>` in the server",
files=[
File(bio_image, file_name="captcha.png"),
File(bio_audio, file_name="captcha.wav"),
],
)
def setup(client: Client):
GatekeepModel.create_table()
GatekeepCaptchasModel.create_table()
Gatekeep(client)
logging.info("Gatekeep extension loaded")

View File

@ -205,3 +205,24 @@ class BadNames(Model):
class Meta:
table_name = "BadNames"
database = db
class Gatekeep(Model):
guild_id = BigIntegerField(primary_key=True)
gatekeep_method = TextField(null=True)
gatekeep_approve_role = BigIntegerField(null=True)
gatekeep_approve_message = TextField(null=True)
class Meta:
table_name = "Gatekeep"
database = db
class GatekeepCaptchas(Model):
guild_id = BigIntegerField()
user_id = BigIntegerField()
captcha = TextField(null=True)
class Meta:
primary_key = CompositeKey("guild_id", "user_id")
table_name = "GatekeepCaptchas"
database = db

27
poetry.lock generated
View File

@ -84,6 +84,17 @@ d = ["aiohttp (>=3.7.4)"]
jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"]
uvloop = ["uvloop (>=0.15.2)"]
[[package]]
name = "captcha"
version = "0.4"
description = "A captcha library that generates audio and image CAPTCHAs."
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
Pillow = "*"
[[package]]
name = "charset-normalizer"
version = "2.1.0"
@ -242,6 +253,18 @@ category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pillow"
version = "9.2.0"
description = "Python Imaging Library (Fork)"
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
docs = ["furo", "olefile", "sphinx (>=2.4)", "sphinx-copybutton", "sphinx-issues (>=3.0.1)", "sphinx-removed-in", "sphinxext-opengraph"]
tests = ["check-manifest", "coverage", "defusedxml", "markdown2", "olefile", "packaging", "pyroma", "pytest", "pytest-cov", "pytest-timeout"]
[[package]]
name = "platformdirs"
version = "2.5.2"
@ -334,7 +357,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "3917de69112df241cadc7031034f6654e92f81df42f7bf627192ab88325e7e92"
content-hash = "46c9e2b17bb873afeedc6da8869c9455b10bec8648e466b05daf262fa178cdd8"
[metadata.files]
aiohttp = []
@ -346,6 +369,7 @@ attrs = [
{file = "attrs-21.4.0.tar.gz", hash = "sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"},
]
black = []
captcha = []
charset-normalizer = []
click = []
colorama = [
@ -428,6 +452,7 @@ naff = []
orjson = []
pathspec = []
peewee = []
pillow = []
platformdirs = []
pylint = []
python-dotenv = []

View File

@ -10,6 +10,7 @@ peewee = "^3.15.1"
naff = "^1.6.0"
orjson = "^3.7.8"
python-dotenv = "^0.20.0"
captcha = "^0.4"
[tool.poetry.dev-dependencies]
black = "^22.6.0"