mirror of
https://github.com/CAG2Mark/SuperUserBot.git
synced 2025-01-13 15:56:29 +01:00
done
This commit is contained in:
commit
0ce6f37a5e
7 changed files with 370 additions and 0 deletions
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
/target
|
||||||
|
.env
|
||||||
|
__pycache__
|
||||||
|
botenv
|
||||||
|
guilddata.json
|
||||||
|
removequeue.json
|
154
bot.py
Normal file
154
bot.py
Normal file
|
@ -0,0 +1,154 @@
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import disnake
|
||||||
|
from guilddata import GuildData
|
||||||
|
|
||||||
|
from disnake.ext import commands
|
||||||
|
import bcrypt
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from removequeue import RemoveQueue
|
||||||
|
|
||||||
|
def success_embed(message: str) -> disnake.Embed:
|
||||||
|
return disnake.Embed(
|
||||||
|
description=":white_check_mark: " + message,
|
||||||
|
color=disnake.Colour.green()
|
||||||
|
)
|
||||||
|
|
||||||
|
def error_embed(message: str) -> disnake.Embed:
|
||||||
|
return disnake.Embed(
|
||||||
|
description=":x: " + message,
|
||||||
|
color=disnake.Colour.red()
|
||||||
|
)
|
||||||
|
|
||||||
|
def warn_embed(message: str) -> disnake.Embed:
|
||||||
|
return disnake.Embed(
|
||||||
|
description=":warning: " + message,
|
||||||
|
color=disnake.Colour.yellow()
|
||||||
|
)
|
||||||
|
|
||||||
|
def hash_salt_password(password: bytes) -> str:
|
||||||
|
salt = bcrypt.gensalt()
|
||||||
|
pw_hash = bcrypt.hashpw(password, salt)
|
||||||
|
return pw_hash.hex() # note: bcryprt lib stores the salt with the hash
|
||||||
|
|
||||||
|
class Bot:
|
||||||
|
logger = logging.getLogger("superuserbot")
|
||||||
|
|
||||||
|
def __init__(self, token, guild_data: GuildData, remove_queue: RemoveQueue) -> None:
|
||||||
|
intents = disnake.Intents.default()
|
||||||
|
intents.members = True
|
||||||
|
|
||||||
|
client = commands.InteractionBot(intents=intents)
|
||||||
|
self.client = client
|
||||||
|
self.data: GuildData = guild_data
|
||||||
|
self.rq = remove_queue
|
||||||
|
|
||||||
|
self.initialized = False
|
||||||
|
|
||||||
|
@client.event
|
||||||
|
async def on_ready():
|
||||||
|
self.logger.info(f"Successfully logged in to Discord with username {client.user}")
|
||||||
|
self.initialized = True
|
||||||
|
|
||||||
|
@client.slash_command(name='sudo',
|
||||||
|
description='Gives you the sudo role for a certain period of time.',
|
||||||
|
dm_permission=False)
|
||||||
|
async def sudo_command(inter: disnake.ApplicationCommandInteraction,
|
||||||
|
password: str = commands.Param(default="", name="password", description="Your password. Leave empty if you do not have a password on this guild yet."),
|
||||||
|
duration: int = commands.Param(default=5, name="duration", description="The time you receive the administrator role for in minutes.", gt=1)):
|
||||||
|
pw_bytes = password.encode('utf-8')
|
||||||
|
del password
|
||||||
|
pw_hash = self.data.get_user_password_hashsalt(inter.guild_id, inter.user.id)
|
||||||
|
pw_hash_bytes = bytes.fromhex(self.data.get_user_password_hashsalt(inter.guild_id, inter.user.id))
|
||||||
|
|
||||||
|
if not pw_hash or bcrypt.checkpw(pw_bytes, pw_hash_bytes):
|
||||||
|
sudoer_role = self.data.get_guild_sudoer_role(inter.guild_id)
|
||||||
|
has_sudoer_role = any([r.id == sudoer_role for r in inter.user.roles])
|
||||||
|
|
||||||
|
if not has_sudoer_role:
|
||||||
|
await inter.response.send_message(embed=error_embed(f"<@{inter.user.id}> is not in the sudoers file. This incident will be reported."), ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
sudo_role_id = self.data.get_guild_role(inter.guild_id)
|
||||||
|
sudo_role = inter.guild.get_role(sudo_role_id)
|
||||||
|
|
||||||
|
if not sudo_role:
|
||||||
|
inter.response.send_message(embed=error_embed("Misconfigured sudo role. Please contact an administrator."), ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
await inter.user.add_roles(sudo_role)
|
||||||
|
|
||||||
|
del_time = int(datetime.utcnow().timestamp()) + duration
|
||||||
|
self.rq.add(del_time, inter.user.id, inter.guild_id, sudo_role_id)
|
||||||
|
|
||||||
|
await inter.response.send_message(embed=success_embed("You are now in sudo mode."), ephemeral=True)
|
||||||
|
else:
|
||||||
|
await inter.response.send_message(embed=error_embed("Incorrect password."), ephemeral=True, delete_after=4)
|
||||||
|
|
||||||
|
@client.slash_command(name='set_password',
|
||||||
|
description='Set your password on this guild. DO NOT use the same password elsewhere.',
|
||||||
|
dm_permission=False)
|
||||||
|
async def set_password_command(inter: disnake.ApplicationCommandInteraction,
|
||||||
|
password: str = commands.Param(default="", name="password", description="The password to set.")):
|
||||||
|
pw = self.data.get_user_password_hashsalt(inter.guild_id, inter.user.id)
|
||||||
|
sudo_role = self.data.get_guild_role(inter.guild_id)
|
||||||
|
has_sudo_role = any([r.id == sudo_role for r in inter.user.roles])
|
||||||
|
|
||||||
|
if pw and not has_sudo_role:
|
||||||
|
await inter.response.send_message(embed=warn_embed("Please enter sudo mode using `/sudo` to change your password."), ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not password.strip():
|
||||||
|
self.data.set_user_password_hash(inter.guild_id, inter.user.id, "")
|
||||||
|
await inter.response.send_message(embed=warn_embed("You have set an empty password. This is not recommended."), ephemeral=True)
|
||||||
|
else:
|
||||||
|
pw_bytes = password.encode('utf-8')
|
||||||
|
del password
|
||||||
|
pw_hash = hash_salt_password(pw_bytes)
|
||||||
|
self.data.set_user_password_hash(inter.guild_id, inter.user.id, pw_hash)
|
||||||
|
await inter.response.send_message(embed=success_embed("Successfully set your password."), ephemeral=True)
|
||||||
|
|
||||||
|
@client.slash_command(name='set_sudoers_role',
|
||||||
|
description='Sets the sudoers role. This role allows the bearer to use `/sudo`.',
|
||||||
|
dm_permission=False)
|
||||||
|
@commands.default_member_permissions(administrator=True)
|
||||||
|
async def set_sudoers_role(inter: disnake.ApplicationCommandInteraction,
|
||||||
|
role: disnake.Role = commands.Param(description="The sudoers rule.")):
|
||||||
|
self.data.set_guild_sudoer_role(inter.guild_id, role.id)
|
||||||
|
await inter.response.send_message(embed=success_embed(f"Set the sudoers role to **{role.name}**."))
|
||||||
|
|
||||||
|
@client.slash_command(name='set_sudo_role',
|
||||||
|
description='Sets the sudo role. This is the role someone is given then running `/sudo`.',
|
||||||
|
dm_permission=False)
|
||||||
|
@commands.default_member_permissions(administrator=True)
|
||||||
|
async def set_sudo_role_command(inter: disnake.ApplicationCommandInteraction,
|
||||||
|
role: disnake.Role = commands.Param(description="The role someone is given when running `/sudo`.")):
|
||||||
|
self.data.set_guild_role(inter.guild_id, role.id)
|
||||||
|
await inter.response.send_message(embed=success_embed(f"Set the sudo role to **{role.name}**."))
|
||||||
|
|
||||||
|
async def wrap():
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
cur_time = int(datetime.utcnow().timestamp())
|
||||||
|
while self.rq.get_min_time() <= cur_time and self.rq.queue:
|
||||||
|
(_, user, guild, role) = self.rq.pop()
|
||||||
|
|
||||||
|
guild = client.get_guild(guild)
|
||||||
|
if not guild: continue
|
||||||
|
|
||||||
|
user = guild.get_member(user)
|
||||||
|
if not user: continue
|
||||||
|
|
||||||
|
role = user.get_role(role)
|
||||||
|
if not role: continue
|
||||||
|
|
||||||
|
await user.remove_roles(role)
|
||||||
|
|
||||||
|
|
||||||
|
client.loop.create_task(wrap())
|
||||||
|
|
||||||
|
client.run(token)
|
||||||
|
|
||||||
|
|
||||||
|
|
113
guilddata.py
Normal file
113
guilddata.py
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
from utils import mutex, datawrite
|
||||||
|
|
||||||
|
from threading import Lock
|
||||||
|
|
||||||
|
lock = Lock()
|
||||||
|
filelock = Lock()
|
||||||
|
|
||||||
|
GUILD_ROLES = "guild_roles"
|
||||||
|
GUILD_SUDOERS_ROLES = "guild_sudoers_roles"
|
||||||
|
GUILD_PASSWORDS = "guild_passwords"
|
||||||
|
|
||||||
|
FILE = "guilddata.json"
|
||||||
|
|
||||||
|
|
||||||
|
def argstostr(func):
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
result = func(self, *[str(x) for x in args], **kwargs)
|
||||||
|
self.export()
|
||||||
|
return result
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
# converts all but the last argument to a string
|
||||||
|
def inpargstostr(func):
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
newargs = []
|
||||||
|
for i, a in enumerate(args):
|
||||||
|
if i < len(args) - 1:
|
||||||
|
newargs.append(str(a))
|
||||||
|
else:
|
||||||
|
newargs.append(a)
|
||||||
|
|
||||||
|
result = func(self, *newargs, **kwargs)
|
||||||
|
self.export()
|
||||||
|
return result
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
class GuildData:
|
||||||
|
def __init__(self):
|
||||||
|
self.map = {}
|
||||||
|
self.map[GUILD_ROLES] = {}
|
||||||
|
self.map[GUILD_SUDOERS_ROLES] = {}
|
||||||
|
self.map[GUILD_PASSWORDS] = {}
|
||||||
|
|
||||||
|
def roles(self):
|
||||||
|
return self.map[GUILD_ROLES]
|
||||||
|
|
||||||
|
def sudoers_roles(self):
|
||||||
|
return self.map[GUILD_SUDOERS_ROLES]
|
||||||
|
|
||||||
|
def passwords(self):
|
||||||
|
return self.map[GUILD_PASSWORDS]
|
||||||
|
|
||||||
|
@argstostr
|
||||||
|
@mutex(lock=lock)
|
||||||
|
def get_guild_role(self, guild) -> int:
|
||||||
|
if guild in self.roles():
|
||||||
|
return self.roles()[guild]
|
||||||
|
return None
|
||||||
|
|
||||||
|
@inpargstostr
|
||||||
|
@mutex(lock=lock)
|
||||||
|
@datawrite
|
||||||
|
def set_guild_role(self, guild: int, role: int):
|
||||||
|
self.roles()[guild] = role
|
||||||
|
|
||||||
|
@argstostr
|
||||||
|
@mutex(lock=lock)
|
||||||
|
def get_guild_sudoer_role(self, guild) -> int:
|
||||||
|
if guild in self.sudoers_roles():
|
||||||
|
return self.sudoers_roles()[guild]
|
||||||
|
return None
|
||||||
|
|
||||||
|
@inpargstostr
|
||||||
|
@mutex(lock=lock)
|
||||||
|
@datawrite
|
||||||
|
def set_guild_sudoer_role(self, guild: int, role: int):
|
||||||
|
self.sudoers_roles()[guild] = role
|
||||||
|
|
||||||
|
@argstostr
|
||||||
|
@mutex(lock=lock)
|
||||||
|
def get_user_password_hashsalt(self, guild: int, user: int) -> str:
|
||||||
|
if not guild in self.passwords():
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not user in self.passwords()[guild]:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return self.passwords()[guild][user]
|
||||||
|
|
||||||
|
@inpargstostr
|
||||||
|
@mutex(lock=lock)
|
||||||
|
@datawrite
|
||||||
|
def set_user_password_hash(self, guild: int, user: int, hashsalt: str):
|
||||||
|
if not guild in self.passwords():
|
||||||
|
self.passwords()[guild] = {}
|
||||||
|
|
||||||
|
self.passwords()[guild][user] = hashsalt
|
||||||
|
|
||||||
|
@mutex(lock=filelock)
|
||||||
|
def export(self):
|
||||||
|
with open(FILE, 'w') as f:
|
||||||
|
json.dump(self.map, f)
|
||||||
|
|
||||||
|
@mutex(lock=filelock)
|
||||||
|
def load(self):
|
||||||
|
if not os.path.exists(FILE):
|
||||||
|
return
|
||||||
|
|
||||||
|
with open(FILE, 'r') as f:
|
||||||
|
self.map = json.load(f)
|
28
main.py
Normal file
28
main.py
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import bot
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from guilddata import GuildData
|
||||||
|
from removequeue import RemoveQueue
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
TOKEN = os.getenv('TOKEN')
|
||||||
|
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
logging.getLogger('disnake').setLevel(logging.WARNING)
|
||||||
|
logging.getLogger('logger.http').setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
logger = logging.getLogger("superuserbot")
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
logger.info("Started superuserbot")
|
||||||
|
|
||||||
|
gdata = GuildData()
|
||||||
|
gdata.load()
|
||||||
|
|
||||||
|
rq = RemoveQueue()
|
||||||
|
rq.load()
|
||||||
|
|
||||||
|
instance = bot.Bot(TOKEN, gdata, rq)
|
51
removequeue.py
Normal file
51
removequeue.py
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
from threading import Lock
|
||||||
|
from utils import mutex, datawrite
|
||||||
|
|
||||||
|
import heapq
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
FILE="removequeue.json"
|
||||||
|
|
||||||
|
lock = Lock()
|
||||||
|
filelock = Lock()
|
||||||
|
|
||||||
|
# use a min-heap to get the next removal time
|
||||||
|
class RemoveQueue:
|
||||||
|
def __init__(self):
|
||||||
|
self.queue = []
|
||||||
|
|
||||||
|
@mutex(lock=lock)
|
||||||
|
@datawrite
|
||||||
|
def add(self, del_time: int, user: int, guild: int, role: int):
|
||||||
|
heapq.heappush(self.queue, (del_time, user, guild, role))
|
||||||
|
|
||||||
|
@mutex(lock=lock)
|
||||||
|
def get_min_time(self):
|
||||||
|
if self.queue:
|
||||||
|
return self.queue[0][0]
|
||||||
|
return -1
|
||||||
|
|
||||||
|
@mutex(lock=lock)
|
||||||
|
@datawrite
|
||||||
|
def pop(self):
|
||||||
|
if self.queue:
|
||||||
|
return heapq.heappop(self.queue)
|
||||||
|
return None
|
||||||
|
|
||||||
|
@mutex(lock=filelock)
|
||||||
|
def export(self):
|
||||||
|
with open(FILE, 'w') as f:
|
||||||
|
json.dump(self.queue, f)
|
||||||
|
|
||||||
|
@mutex(lock=filelock)
|
||||||
|
def load(self):
|
||||||
|
if not os.path.exists(FILE):
|
||||||
|
return
|
||||||
|
|
||||||
|
with open(FILE, 'r') as f:
|
||||||
|
self.queue = json.load(f)
|
||||||
|
|
||||||
|
heapq.heapify(self.queue)
|
3
requirements.txt
Normal file
3
requirements.txt
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
disnake
|
||||||
|
python-dotenv
|
||||||
|
bcrypt
|
15
utils.py
Normal file
15
utils.py
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
def datawrite(func):
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
result = func(self, *args, **kwargs)
|
||||||
|
self.export()
|
||||||
|
return result
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
def mutex(*args_, **kwargs_):
|
||||||
|
def w1(func):
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
with kwargs_["lock"]:
|
||||||
|
val = func(self, *args, **kwargs)
|
||||||
|
return val
|
||||||
|
return wrapper
|
||||||
|
return w1
|
Loading…
Reference in a new issue