# pylint: disable=unsubscriptable-object import logging from typing import List, Optional from naff import ( Extension, slash_command, slash_option, OptionTypes, InteractionContext, Embed, EmbedField, Permissions, Member, Client, ActionRow, Button, ButtonStyles, ) from naff.ext.paginators import Paginator from peewee import fn from database import ( GuildSettings, Infractions as InfractionsModel, ) class Infractions(Extension): def __init__(self, client): self.client: Client = client @slash_command( name="infractions", description="View your infractions", dm_permission=False ) async def infractions_command(self, ctx: InteractionContext): await ctx.defer(ephemeral=True) infractions: List[InfractionsModel] = InfractionsModel.select().where( InfractionsModel.guild_id == int(ctx.guild_id), InfractionsModel.user_id == int(ctx.author.id), ) if len(infractions) == 0: await ctx.send("You have no infractions.", ephemeral=True) return embeds: List[Embed] = [] for infraction in infractions: # pylint: disable=not-an-iterable embed = Embed( title="Infraction", description=f"{infraction.reason}", color=infraction_colour(infraction.weight), fields=[ EmbedField( name="**Received**", value=f"", inline=True, ), EmbedField( name="**Severity**", value=f"{infraction.weight}", inline=True ), ], ) embeds.append(embed) # await ctx.send(embed=embeds, ephemeral=True) paginator = Paginator.create_from_embeds(self.client, *embeds) await paginator.send(ctx) @slash_command( name="user-infractions", description="View a user's infractions", dm_permission=False, default_member_permissions=Permissions.KICK_MEMBERS, ) @slash_option( name="user", description="User to view", required=True, opt_type=OptionTypes.USER, ) async def user_infractions(self, ctx: InteractionContext, user: Member): await ctx.defer(ephemeral=False) infractions: List[InfractionsModel] = InfractionsModel.select().where( InfractionsModel.guild_id == int(ctx.guild_id), InfractionsModel.user_id == int(user.id), ) if len(infractions) == 0: await ctx.send(f"{user.mention} has no infractions.") return infractions_weight: List[InfractionsModel] = InfractionsModel.select( InfractionsModel.user_id, fn.SUM(InfractionsModel.weight).alias("total_weight"), fn.COUNT(InfractionsModel.id).alias("total_infractions"), ).where( InfractionsModel.guild_id == int(ctx.guild_id), InfractionsModel.user_id == int(user.id), ) embeds: List[Embed] = [] for infraction in infractions: # pylint: disable=not-an-iterable issuer = await self.client.fetch_member( guild_id=ctx.guild, user_id=infraction.given_by ) embed = Embed( title=f"Infraction for user {user.display_name} ({user.username}#{user.discriminator}, {user.id})", # pylint: disable=line-too-long description=f"{infraction.reason}", color=infraction_colour(infraction.weight), fields=[ EmbedField(name="**ID**", value=f"{infraction.id}", inline=True), EmbedField( name="**Received**", value=f"", inline=True, ), EmbedField( name="**Severity**", value=f"{infraction.weight}", inline=True ), EmbedField( name="**Issued by**", value=f"{issuer.display_name}", inline=True, ), ], ) embeds.append(embed) if len(embeds) <= 10: await ctx.send( content=f"{user.mention} has {infractions_weight[0].total_infractions} infractions, for a total weight of {infractions_weight[0].total_weight:.2f}" if infractions_weight.count() > 0 else None, embed=embeds, ephemeral=False, ) else: await ctx.send( content=f"{user.mention} has {infractions_weight[0].total_infractions} infractions, for a total weight of {infractions_weight[0].total_weight:.2f}" if infractions_weight.count() > 0 else None, embed=embeds[:10], ephemeral=False, ) await ctx.send(embed=embeds[10:]) @slash_command( name="warn", description="Warn a user", dm_permission=False, default_member_permissions=Permissions.KICK_MEMBERS, ) @slash_option( name="user", description="User to warn", required=True, opt_type=OptionTypes.USER, ) @slash_option( name="reason", description="Reason for warning", required=False, opt_type=OptionTypes.STRING, ) @slash_option( name="weight", description="Severity of warning", required=False, opt_type=OptionTypes.NUMBER, min_value=0.0, max_value=10.0, ) @slash_option( name="silent", description="Silent warning (will not notify user)", required=False, opt_type=OptionTypes.BOOLEAN, ) async def warn_user( self, ctx: InteractionContext, user: Member, reason: str = None, weight: float = None, silent: bool = None, ): await ctx.defer(ephemeral=False) if weight is None: weight = 1.0 if silent is None: silent = False if reason is None: reason = "" infraction = InfractionsModel.create( guild_id=int(ctx.guild_id), user_id=int(user.id), given_by=int(ctx.author.id), reason=reason, weight=weight, silent=silent, ) warning_msg = ... if not silent: try: warning_msg = await user.send( embed=Embed( title=f"You have received a warning in {ctx.guild.name}", description=f"{reason}", color=infraction_colour(weight), fields=[ EmbedField(name="**Severity**", value=f"{weight}"), EmbedField( name="**Issued at**", value=f"", # pylint: disable=no-member ), ], ) ) except Exception: # pylint: disable=broad-except warning_msg = None await ctx.send( f'Warned {user.display_name} ({user.username}#{user.discriminator}, {user.id}) for "{reason}" with severity {weight}', # pylint: disable=line-too-long ephemeral=True, ) guild_settings: Optional[GuildSettings] = GuildSettings.get_or_none(GuildSettings.guild_id == int(ctx.guild_id)) if guild_settings is not None: if guild_settings.admin_channel is not None: admin_channel = await self.client.fetch_channel(int(guild_settings.admin_channel)) if admin_channel is not None: await admin_channel.send(embed=Embed( title=f"Warned {user.display_name} ({user.username}#{user.discriminator}, {user.id})", description=f"{reason}", color=infraction_colour(0x0000FF), fields=[ ], )) if not silent and warning_msg is None: await ctx.send( f"{user.mention} has been warned, but I couldn't DM them.", ephemeral=True, ) @slash_command( name="remove-infraction", description="Remove an infraction", dm_permission=False, default_member_permissions=Permissions.KICK_MEMBERS, ) @slash_option( name="infraction-id", description="ID of infraction to remove", required=True, opt_type=OptionTypes.INTEGER, ) async def remove_infraction(self, ctx: InteractionContext, infraction_id: int): await ctx.defer(ephemeral=True) infraction: InfractionsModel = InfractionsModel.get_or_none( InfractionsModel.id == infraction_id ) if infraction is None: await ctx.send("That infraction doesn't exist.", ephemeral=True) return if infraction.guild_id != int(ctx.guild_id): await ctx.send("That infraction doesn't exist.", ephemeral=True) return # pylint: disable=unexpected-keyword-arg components = [ ActionRow( Button( custom_id=f"remove-infraction:remove:{infraction.id}", style=ButtonStyles.DANGER, label="Remove infraction", ), Button( custom_id=f"remove-infraction:cancel:{infraction.id}", style=ButtonStyles.SECONDARY, label="Cancel", ), ) ] given_by: Member = await self.client.fetch_member(f"{infraction.given_by}", ctx.guild_id) await ctx.send( content="Remove this infraction? (times out in 60 seconds)", embed=Embed( title="Infraction", description=f"{infraction.reason}", color=infraction_colour(infraction.weight), fields=[ EmbedField(name="**ID**", value=f"{infraction.id}"), EmbedField( name="**Received**", value=f"", ), EmbedField(name="**Severity**", value=f"{infraction.weight}"), EmbedField(name="**Issued by**", value=f"{given_by.display_name} ({given_by.username}#{given_by.discriminator})"), ], ), ephemeral=True, components=components, ) try: used_comp = await self.client.wait_for_component( components=components, timeout=60 ) except TimeoutError: await ctx.send("Timed out.", ephemeral=True) return else: (group, action, inf_id) = used_comp.context.custom_id.split(":") if group != "remove-infraction" or action == "cancel": await used_comp.context.send("Cancelled.", ephemeral=True) return if action == "remove" and inf_id == str(infraction.id): infraction.delete_instance() await used_comp.context.send("Removed.", ephemeral=True) return def infraction_colour(w: float) -> int: if w < 0.5: return 0xBCBCBC if w < 1.0: return 0xFF7711 return 0xFF2211 def setup(client): InfractionsModel.create_table() Infractions(client) logging.info("Infractions extension loaded.")