From 0ce6f37a5e9b4f79f76a2039cfd3fb66938fbbe2 Mon Sep 17 00:00:00 2001 From: CAG2Mark Date: Sun, 28 Apr 2024 23:18:06 +0800 Subject: [PATCH] done --- .gitignore | 6 ++ bot.py | 154 +++++++++++++++++++++++++++++++++++++++++++++++ guilddata.py | 113 ++++++++++++++++++++++++++++++++++ main.py | 28 +++++++++ removequeue.py | 51 ++++++++++++++++ requirements.txt | 3 + utils.py | 15 +++++ 7 files changed, 370 insertions(+) create mode 100644 .gitignore create mode 100644 bot.py create mode 100644 guilddata.py create mode 100644 main.py create mode 100644 removequeue.py create mode 100644 requirements.txt create mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2df07ea --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +/target +.env +__pycache__ +botenv +guilddata.json +removequeue.json \ No newline at end of file diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..95b9424 --- /dev/null +++ b/bot.py @@ -0,0 +1,154 @@ +import asyncio +import logging +import disnake +from guilddata import GuildData + +from disnake.ext import commands +import bcrypt +from datetime import datetime + +from removequeue import RemoveQueue + +def success_embed(message: str) -> disnake.Embed: + return disnake.Embed( + description=":white_check_mark: " + message, + color=disnake.Colour.green() + ) + +def error_embed(message: str) -> disnake.Embed: + return disnake.Embed( + description=":x: " + message, + color=disnake.Colour.red() + ) + +def warn_embed(message: str) -> disnake.Embed: + return disnake.Embed( + description=":warning: " + message, + color=disnake.Colour.yellow() + ) + +def hash_salt_password(password: bytes) -> str: + salt = bcrypt.gensalt() + pw_hash = bcrypt.hashpw(password, salt) + return pw_hash.hex() # note: bcryprt lib stores the salt with the hash + +class Bot: + logger = logging.getLogger("superuserbot") + + def __init__(self, token, guild_data: GuildData, remove_queue: RemoveQueue) -> None: + intents = disnake.Intents.default() + intents.members = True + + client = commands.InteractionBot(intents=intents) + self.client = client + self.data: GuildData = guild_data + self.rq = remove_queue + + self.initialized = False + + @client.event + async def on_ready(): + self.logger.info(f"Successfully logged in to Discord with username {client.user}") + self.initialized = True + + @client.slash_command(name='sudo', + description='Gives you the sudo role for a certain period of time.', + dm_permission=False) + async def sudo_command(inter: disnake.ApplicationCommandInteraction, + password: str = commands.Param(default="", name="password", description="Your password. Leave empty if you do not have a password on this guild yet."), + duration: int = commands.Param(default=5, name="duration", description="The time you receive the administrator role for in minutes.", gt=1)): + pw_bytes = password.encode('utf-8') + del password + pw_hash = self.data.get_user_password_hashsalt(inter.guild_id, inter.user.id) + pw_hash_bytes = bytes.fromhex(self.data.get_user_password_hashsalt(inter.guild_id, inter.user.id)) + + if not pw_hash or bcrypt.checkpw(pw_bytes, pw_hash_bytes): + sudoer_role = self.data.get_guild_sudoer_role(inter.guild_id) + has_sudoer_role = any([r.id == sudoer_role for r in inter.user.roles]) + + if not has_sudoer_role: + await inter.response.send_message(embed=error_embed(f"<@{inter.user.id}> is not in the sudoers file. This incident will be reported."), ephemeral=True) + return + + sudo_role_id = self.data.get_guild_role(inter.guild_id) + sudo_role = inter.guild.get_role(sudo_role_id) + + if not sudo_role: + inter.response.send_message(embed=error_embed("Misconfigured sudo role. Please contact an administrator."), ephemeral=True) + return + + await inter.user.add_roles(sudo_role) + + del_time = int(datetime.utcnow().timestamp()) + duration + self.rq.add(del_time, inter.user.id, inter.guild_id, sudo_role_id) + + await inter.response.send_message(embed=success_embed("You are now in sudo mode."), ephemeral=True) + else: + await inter.response.send_message(embed=error_embed("Incorrect password."), ephemeral=True, delete_after=4) + + @client.slash_command(name='set_password', + description='Set your password on this guild. DO NOT use the same password elsewhere.', + dm_permission=False) + async def set_password_command(inter: disnake.ApplicationCommandInteraction, + password: str = commands.Param(default="", name="password", description="The password to set.")): + pw = self.data.get_user_password_hashsalt(inter.guild_id, inter.user.id) + sudo_role = self.data.get_guild_role(inter.guild_id) + has_sudo_role = any([r.id == sudo_role for r in inter.user.roles]) + + if pw and not has_sudo_role: + await inter.response.send_message(embed=warn_embed("Please enter sudo mode using `/sudo` to change your password."), ephemeral=True) + return + + if not password.strip(): + self.data.set_user_password_hash(inter.guild_id, inter.user.id, "") + await inter.response.send_message(embed=warn_embed("You have set an empty password. This is not recommended."), ephemeral=True) + else: + pw_bytes = password.encode('utf-8') + del password + pw_hash = hash_salt_password(pw_bytes) + self.data.set_user_password_hash(inter.guild_id, inter.user.id, pw_hash) + await inter.response.send_message(embed=success_embed("Successfully set your password."), ephemeral=True) + + @client.slash_command(name='set_sudoers_role', + description='Sets the sudoers role. This role allows the bearer to use `/sudo`.', + dm_permission=False) + @commands.default_member_permissions(administrator=True) + async def set_sudoers_role(inter: disnake.ApplicationCommandInteraction, + role: disnake.Role = commands.Param(description="The sudoers rule.")): + self.data.set_guild_sudoer_role(inter.guild_id, role.id) + await inter.response.send_message(embed=success_embed(f"Set the sudoers role to **{role.name}**.")) + + @client.slash_command(name='set_sudo_role', + description='Sets the sudo role. This is the role someone is given then running `/sudo`.', + dm_permission=False) + @commands.default_member_permissions(administrator=True) + async def set_sudo_role_command(inter: disnake.ApplicationCommandInteraction, + role: disnake.Role = commands.Param(description="The role someone is given when running `/sudo`.")): + self.data.set_guild_role(inter.guild_id, role.id) + await inter.response.send_message(embed=success_embed(f"Set the sudo role to **{role.name}**.")) + + async def wrap(): + while True: + await asyncio.sleep(1) + cur_time = int(datetime.utcnow().timestamp()) + while self.rq.get_min_time() <= cur_time and self.rq.queue: + (_, user, guild, role) = self.rq.pop() + + guild = client.get_guild(guild) + if not guild: continue + + user = guild.get_member(user) + if not user: continue + + role = user.get_role(role) + if not role: continue + + await user.remove_roles(role) + + + client.loop.create_task(wrap()) + + client.run(token) + + + \ No newline at end of file diff --git a/guilddata.py b/guilddata.py new file mode 100644 index 0000000..806874a --- /dev/null +++ b/guilddata.py @@ -0,0 +1,113 @@ +import os +import json + +from utils import mutex, datawrite + +from threading import Lock + +lock = Lock() +filelock = Lock() + +GUILD_ROLES = "guild_roles" +GUILD_SUDOERS_ROLES = "guild_sudoers_roles" +GUILD_PASSWORDS = "guild_passwords" + +FILE = "guilddata.json" + + +def argstostr(func): + def wrapper(self, *args, **kwargs): + result = func(self, *[str(x) for x in args], **kwargs) + self.export() + return result + return wrapper + +# converts all but the last argument to a string +def inpargstostr(func): + def wrapper(self, *args, **kwargs): + newargs = [] + for i, a in enumerate(args): + if i < len(args) - 1: + newargs.append(str(a)) + else: + newargs.append(a) + + result = func(self, *newargs, **kwargs) + self.export() + return result + return wrapper + +class GuildData: + def __init__(self): + self.map = {} + self.map[GUILD_ROLES] = {} + self.map[GUILD_SUDOERS_ROLES] = {} + self.map[GUILD_PASSWORDS] = {} + + def roles(self): + return self.map[GUILD_ROLES] + + def sudoers_roles(self): + return self.map[GUILD_SUDOERS_ROLES] + + def passwords(self): + return self.map[GUILD_PASSWORDS] + + @argstostr + @mutex(lock=lock) + def get_guild_role(self, guild) -> int: + if guild in self.roles(): + return self.roles()[guild] + return None + + @inpargstostr + @mutex(lock=lock) + @datawrite + def set_guild_role(self, guild: int, role: int): + self.roles()[guild] = role + + @argstostr + @mutex(lock=lock) + def get_guild_sudoer_role(self, guild) -> int: + if guild in self.sudoers_roles(): + return self.sudoers_roles()[guild] + return None + + @inpargstostr + @mutex(lock=lock) + @datawrite + def set_guild_sudoer_role(self, guild: int, role: int): + self.sudoers_roles()[guild] = role + + @argstostr + @mutex(lock=lock) + def get_user_password_hashsalt(self, guild: int, user: int) -> str: + if not guild in self.passwords(): + return None + + if not user in self.passwords()[guild]: + return None + + return self.passwords()[guild][user] + + @inpargstostr + @mutex(lock=lock) + @datawrite + def set_user_password_hash(self, guild: int, user: int, hashsalt: str): + if not guild in self.passwords(): + self.passwords()[guild] = {} + + self.passwords()[guild][user] = hashsalt + + @mutex(lock=filelock) + def export(self): + with open(FILE, 'w') as f: + json.dump(self.map, f) + + @mutex(lock=filelock) + def load(self): + if not os.path.exists(FILE): + return + + with open(FILE, 'r') as f: + self.map = json.load(f) \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..b6bb15b --- /dev/null +++ b/main.py @@ -0,0 +1,28 @@ +import logging +import os +import bot +from dotenv import load_dotenv + +from guilddata import GuildData +from removequeue import RemoveQueue + +load_dotenv() + +TOKEN = os.getenv('TOKEN') +logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + +logging.getLogger('disnake').setLevel(logging.WARNING) +logging.getLogger('logger.http').setLevel(logging.WARNING) + +logger = logging.getLogger("superuserbot") +logger.setLevel(logging.DEBUG) + +logger.info("Started superuserbot") + +gdata = GuildData() +gdata.load() + +rq = RemoveQueue() +rq.load() + +instance = bot.Bot(TOKEN, gdata, rq) diff --git a/removequeue.py b/removequeue.py new file mode 100644 index 0000000..217233f --- /dev/null +++ b/removequeue.py @@ -0,0 +1,51 @@ +from threading import Lock +from utils import mutex, datawrite + +import heapq + +import os + +import json + +FILE="removequeue.json" + +lock = Lock() +filelock = Lock() + +# use a min-heap to get the next removal time +class RemoveQueue: + def __init__(self): + self.queue = [] + + @mutex(lock=lock) + @datawrite + def add(self, del_time: int, user: int, guild: int, role: int): + heapq.heappush(self.queue, (del_time, user, guild, role)) + + @mutex(lock=lock) + def get_min_time(self): + if self.queue: + return self.queue[0][0] + return -1 + + @mutex(lock=lock) + @datawrite + def pop(self): + if self.queue: + return heapq.heappop(self.queue) + return None + + @mutex(lock=filelock) + def export(self): + with open(FILE, 'w') as f: + json.dump(self.queue, f) + + @mutex(lock=filelock) + def load(self): + if not os.path.exists(FILE): + return + + with open(FILE, 'r') as f: + self.queue = json.load(f) + + heapq.heapify(self.queue) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..be3c3bf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +disnake +python-dotenv +bcrypt \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..53d4472 --- /dev/null +++ b/utils.py @@ -0,0 +1,15 @@ +def datawrite(func): + def wrapper(self, *args, **kwargs): + result = func(self, *args, **kwargs) + self.export() + return result + return wrapper + +def mutex(*args_, **kwargs_): + def w1(func): + def wrapper(self, *args, **kwargs): + with kwargs_["lock"]: + val = func(self, *args, **kwargs) + return val + return wrapper + return w1 \ No newline at end of file