Merge pull request 'Add keyword notification feature' (#11) from feature/keyword-notify into main
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
Reviewed-on: #11 Reviewed-by: konradomen <konrad.szychowiak+myrkvi@gmail.com>
This commit is contained in:
commit
eb37f41b45
|
@ -5,7 +5,7 @@ name: test
|
||||||
|
|
||||||
platform:
|
platform:
|
||||||
os: linux
|
os: linux
|
||||||
arch: arm64
|
#arch: arm64
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: greeting
|
- name: greeting
|
||||||
|
|
17
database.py
17
database.py
|
@ -232,3 +232,20 @@ class PollVotes(Model):
|
||||||
class Meta:
|
class Meta:
|
||||||
table_name = "PollVotes"
|
table_name = "PollVotes"
|
||||||
database = db
|
database = db
|
||||||
|
|
||||||
|
class KeywordNotify(Model):
|
||||||
|
"""ID of the keyword notify entry"""
|
||||||
|
id = AutoField()
|
||||||
|
|
||||||
|
"""ID of the guild this belongs to"""
|
||||||
|
guild_id = BigIntegerField()
|
||||||
|
|
||||||
|
"""ID of the user this belongs to"""
|
||||||
|
user_id = BigIntegerField()
|
||||||
|
|
||||||
|
"""The keyword to match – in lower case"""
|
||||||
|
keyword = CharField(max_length=24)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
table_name = "KeywordNotify"
|
||||||
|
database = db
|
|
@ -2,6 +2,7 @@ import logging
|
||||||
from os import getenv
|
from os import getenv
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
import sentry_sdk as sentry
|
||||||
from naff import (
|
from naff import (
|
||||||
Client,
|
Client,
|
||||||
Guild,
|
Guild,
|
||||||
|
@ -111,8 +112,9 @@ class HeimdallrClient(Client):
|
||||||
|
|
||||||
@slash_command(name="ping", description="Ping the bot")
|
@slash_command(name="ping", description="Ping the bot")
|
||||||
async def ping_command(self, ctx: InteractionContext):
|
async def ping_command(self, ctx: InteractionContext):
|
||||||
ctx.ephemeral = True
|
with sentry.start_transaction(op="ping"):
|
||||||
await ctx.send("Pong!", components=RoleSelectMenu(placeholder="HONK"))
|
ctx.ephemeral = True
|
||||||
|
await ctx.send("Pong!", components=RoleSelectMenu(placeholder="HONK"))
|
||||||
|
|
||||||
@slash_command(name="bot-info", description="Get info about the bot")
|
@slash_command(name="bot-info", description="Get info about the bot")
|
||||||
async def bot_info_command(self, ctx: InteractionContext):
|
async def bot_info_command(self, ctx: InteractionContext):
|
||||||
|
@ -283,6 +285,8 @@ def main():
|
||||||
JoinLeave.create_table()
|
JoinLeave.create_table()
|
||||||
|
|
||||||
# Load extensions
|
# Load extensions
|
||||||
|
if (sentry_token := getenv("SENTRY_TOKEN")) is not None:
|
||||||
|
bot.load_extension("naff.ext.sentry", token=sentry_token)
|
||||||
bot.load_extension("heimdallr.commands.admin")
|
bot.load_extension("heimdallr.commands.admin")
|
||||||
bot.load_extension("heimdallr.commands.gatekeep")
|
bot.load_extension("heimdallr.commands.gatekeep")
|
||||||
bot.load_extension("heimdallr.commands.quote")
|
bot.load_extension("heimdallr.commands.quote")
|
||||||
|
@ -291,8 +295,10 @@ def main():
|
||||||
bot.load_extension("heimdallr.commands.polls")
|
bot.load_extension("heimdallr.commands.polls")
|
||||||
bot.load_extension("heimdallr.commands.bot_messages")
|
bot.load_extension("heimdallr.commands.bot_messages")
|
||||||
bot.load_extension("heimdallr.commands.modmail")
|
bot.load_extension("heimdallr.commands.modmail")
|
||||||
|
bot.load_extension("heimdallr.commands.keyword_notify")
|
||||||
bot.start(getenv("DISCORD_TOKEN"))
|
bot.start(getenv("DISCORD_TOKEN"))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
import naff.ext.sentry
|
||||||
main()
|
main()
|
||||||
|
|
|
@ -0,0 +1,214 @@
|
||||||
|
import logging
|
||||||
|
from typing import List, Dict, Set
|
||||||
|
from naff import (
|
||||||
|
AutocompleteContext,
|
||||||
|
Client,
|
||||||
|
Extension,
|
||||||
|
InteractionContext,
|
||||||
|
OptionTypes,
|
||||||
|
slash_command,
|
||||||
|
slash_option,
|
||||||
|
listen,
|
||||||
|
Button,
|
||||||
|
ButtonStyles,
|
||||||
|
events,
|
||||||
|
Message,
|
||||||
|
ChannelTypes,
|
||||||
|
)
|
||||||
|
from database import KeywordNotify as KeywordNotifyModel
|
||||||
|
|
||||||
|
|
||||||
|
class KeywordNotify(Extension):
|
||||||
|
def __init__(self, client: Client):
|
||||||
|
self.client: Client = client
|
||||||
|
self.guild_keywords: Dict[int, Set[str]] = {}
|
||||||
|
|
||||||
|
@listen(events.Ready)
|
||||||
|
async def ready_init(self):
|
||||||
|
for guild in self.bot.guilds:
|
||||||
|
keywords_q: List[KeywordNotifyModel] = (
|
||||||
|
KeywordNotifyModel.select(KeywordNotifyModel.keyword)
|
||||||
|
.where(KeywordNotifyModel.guild_id == guild.id)
|
||||||
|
.group_by(KeywordNotifyModel.keyword)
|
||||||
|
)
|
||||||
|
|
||||||
|
keywords = [str(kw.keyword) for kw in keywords_q]
|
||||||
|
self.guild_keywords[guild.id] = set(keywords)
|
||||||
|
|
||||||
|
@slash_command(
|
||||||
|
name="keyword-notify",
|
||||||
|
description="Get notifications when messages containing certain words are sent.",
|
||||||
|
dm_permission=False,
|
||||||
|
default_member_permissions=None,
|
||||||
|
)
|
||||||
|
async def keyword_notify(self, ctx: InteractionContext):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@keyword_notify.subcommand(
|
||||||
|
sub_cmd_name="add",
|
||||||
|
sub_cmd_description="Add a keyword to be notified about",
|
||||||
|
)
|
||||||
|
@slash_option(
|
||||||
|
name="keyword",
|
||||||
|
description="The keyword",
|
||||||
|
opt_type=OptionTypes.STRING,
|
||||||
|
required=True,
|
||||||
|
max_length=16,
|
||||||
|
)
|
||||||
|
async def keyword_notify_add(self, ctx: InteractionContext, kw: str):
|
||||||
|
keyword_q: KeywordNotifyModel
|
||||||
|
keyword_q, created = KeywordNotifyModel.get_or_create(
|
||||||
|
guild_id=ctx.guild.id, user_id=ctx.author.id, keyword=kw
|
||||||
|
)
|
||||||
|
|
||||||
|
if not created:
|
||||||
|
await ctx.send("Keyword already exists!", ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
self.guild_keywords[int(keyword_q.guild_id)].add(kw)
|
||||||
|
await ctx.send("Keyword created!", ephemeral=True)
|
||||||
|
|
||||||
|
@keyword_notify.subcommand(
|
||||||
|
sub_cmd_name="remove",
|
||||||
|
sub_cmd_description="Remove a keyword to no longer be notified about it",
|
||||||
|
)
|
||||||
|
@slash_option(
|
||||||
|
name="keyword",
|
||||||
|
description="The keyword to remove",
|
||||||
|
opt_type=OptionTypes.INTEGER,
|
||||||
|
required=True,
|
||||||
|
autocomplete=True,
|
||||||
|
)
|
||||||
|
async def keyword_notify_remove(self, ctx: InteractionContext, kw_id: int):
|
||||||
|
guild_id = ctx.guild_id
|
||||||
|
user_id = ctx.author.id
|
||||||
|
|
||||||
|
print(f"Got keyword: {kw_id}")
|
||||||
|
|
||||||
|
keyword: KeywordNotifyModel = KeywordNotifyModel.get_or_none(id=kw_id)
|
||||||
|
|
||||||
|
if keyword.user_id != ctx.author.id:
|
||||||
|
await ctx.send(
|
||||||
|
"An error occurred attempting to remove this keyword. Please contact the developer with error code: KEYWORDNOTIFY_ERR_INVALID_AUTHOR",
|
||||||
|
ephemeral=True,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
keyword_name = keyword.keyword
|
||||||
|
|
||||||
|
KeywordNotifyModel.delete_by_id(keyword.id)
|
||||||
|
|
||||||
|
if (
|
||||||
|
KeywordNotifyModel.select()
|
||||||
|
.where(
|
||||||
|
KeywordNotifyModel.keyword == keyword_name,
|
||||||
|
KeywordNotifyModel.guild_id == ctx.guild.id,
|
||||||
|
)
|
||||||
|
.count()
|
||||||
|
== 0
|
||||||
|
):
|
||||||
|
self.guild_keywords[ctx.guild.id].remove(keyword_name)
|
||||||
|
await ctx.send(f"Keyword '{keyword_name}' was deleted", ephemeral=True)
|
||||||
|
|
||||||
|
@keyword_notify_remove.autocomplete("keyword")
|
||||||
|
async def keyword_notify_remove_keyword_autocomplete(
|
||||||
|
self, ctx: AutocompleteContext, keyword: str
|
||||||
|
):
|
||||||
|
keywords: List[KeywordNotifyModel] = get_user_keywords(
|
||||||
|
ctx.guild_id, ctx.author.id
|
||||||
|
)
|
||||||
|
|
||||||
|
choices = [{"name": x.keyword, "value": x.id} for x in keywords]
|
||||||
|
choices = list(filter(lambda x: x["name"].startswith(keyword), choices))
|
||||||
|
|
||||||
|
await ctx.send(choices=choices if len(choices) <= 25 else choices[:25])
|
||||||
|
|
||||||
|
@keyword_notify.subcommand(
|
||||||
|
sub_cmd_name="list",
|
||||||
|
sub_cmd_description="List the current keywords",
|
||||||
|
)
|
||||||
|
async def keyword_notify_list(self, ctx: InteractionContext):
|
||||||
|
guild_id = ctx.guild_id
|
||||||
|
user_id = ctx.author.id
|
||||||
|
|
||||||
|
keywords = get_user_keywords(guild_id, user_id)
|
||||||
|
|
||||||
|
if len(keywords) == 0:
|
||||||
|
await ctx.send(ephemeral=True, content="You have no keywords.")
|
||||||
|
return
|
||||||
|
|
||||||
|
keywords = [kw.keyword for kw in keywords]
|
||||||
|
|
||||||
|
kw_string = "- " + "\n- ".join(keywords)
|
||||||
|
|
||||||
|
await ctx.send(
|
||||||
|
ephemeral=True,
|
||||||
|
content=f"Your current keywords in this server are:" "\n" f"{kw_string}",
|
||||||
|
)
|
||||||
|
|
||||||
|
@listen(events.MessageCreate)
|
||||||
|
async def on_message_check_keyword(self, event: events.MessageCreate):
|
||||||
|
if event.message.channel.type in [ChannelTypes.DM, ChannelTypes.GROUP_DM]:
|
||||||
|
return
|
||||||
|
|
||||||
|
guild = event.message.channel.guild
|
||||||
|
|
||||||
|
self.guild_keywords[guild.id]
|
||||||
|
|
||||||
|
for keyword in self.guild_keywords[guild.id]:
|
||||||
|
if keyword in event.message.content:
|
||||||
|
logging.info("Message event triggered in KeywordNotify")
|
||||||
|
keywords_q: List[KeywordNotifyModel] = KeywordNotifyModel.select(
|
||||||
|
KeywordNotifyModel.user_id
|
||||||
|
).where(
|
||||||
|
KeywordNotifyModel.keyword == keyword,
|
||||||
|
KeywordNotifyModel.guild_id == guild.id,
|
||||||
|
)
|
||||||
|
|
||||||
|
users = [int(kw.user_id) for kw in keywords_q]
|
||||||
|
await self._send_message_to_user(users, keyword, event.message)
|
||||||
|
|
||||||
|
async def _send_message_to_user(
|
||||||
|
self, users: List[int], keyword: str, message: Message
|
||||||
|
):
|
||||||
|
quoted_message = "> " + message.content.replace("\n", "\n> ")
|
||||||
|
for user in users:
|
||||||
|
user = await self.bot.fetch_user(user)
|
||||||
|
if user is not None:
|
||||||
|
logging.info(
|
||||||
|
f"Attempting to send DM to {user.username}#{user.discriminator}"
|
||||||
|
)
|
||||||
|
await user.send(
|
||||||
|
(
|
||||||
|
f"A message has been sent with the keyword '{keyword}' in *{message.guild.name}*"
|
||||||
|
"\n\n"
|
||||||
|
f"{quoted_message}"
|
||||||
|
"\n"
|
||||||
|
f"{message.author.mention}"
|
||||||
|
),
|
||||||
|
components=Button(
|
||||||
|
style=ButtonStyles.LINK,
|
||||||
|
label="Go to message",
|
||||||
|
url=message.jump_url,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_keywords(guild_id, user_id) -> list[KeywordNotifyModel]:
|
||||||
|
current_keywords: List[KeywordNotifyModel] = (
|
||||||
|
KeywordNotifyModel.select()
|
||||||
|
.where(
|
||||||
|
KeywordNotifyModel.guild_id == guild_id,
|
||||||
|
KeywordNotifyModel.user_id == user_id,
|
||||||
|
)
|
||||||
|
.execute()
|
||||||
|
)
|
||||||
|
|
||||||
|
return current_keywords
|
||||||
|
|
||||||
|
|
||||||
|
def setup(client: Client):
|
||||||
|
KeywordNotifyModel.create_table()
|
||||||
|
x = KeywordNotify(client)
|
||||||
|
logging.info(f"{KeywordNotify.__name__} extension loaded.")
|
Loading…
Reference in New Issue