2024-04-28 17:18:06 +02:00
import asyncio
import logging
import disnake
from guilddata import GuildData
from disnake . ext import commands
import bcrypt
from datetime import datetime
from removequeue import RemoveQueue
def success_embed ( message : str ) - > disnake . Embed :
return disnake . Embed (
description = " :white_check_mark: " + message ,
color = disnake . Colour . green ( )
)
def error_embed ( message : str ) - > disnake . Embed :
return disnake . Embed (
description = " :x: " + message ,
color = disnake . Colour . red ( )
)
def warn_embed ( message : str ) - > disnake . Embed :
return disnake . Embed (
description = " :warning: " + message ,
color = disnake . Colour . yellow ( )
)
def hash_salt_password ( password : bytes ) - > str :
salt = bcrypt . gensalt ( )
pw_hash = bcrypt . hashpw ( password , salt )
return pw_hash . hex ( ) # note: bcryprt lib stores the salt with the hash
class Bot :
logger = logging . getLogger ( " superuserbot " )
def __init__ ( self , token , guild_data : GuildData , remove_queue : RemoveQueue ) - > None :
2024-04-29 09:02:17 +02:00
client = commands . InteractionBot ( )
2024-04-28 17:18:06 +02:00
self . client = client
self . data : GuildData = guild_data
self . rq = remove_queue
self . initialized = False
2024-05-12 19:27:16 +02:00
self . connected = False
2024-04-28 17:18:06 +02:00
@client.event
async def on_ready ( ) :
self . logger . info ( f " Successfully logged in to Discord with username { client . user } " )
self . initialized = True
2024-05-12 19:27:16 +02:00
@client.event
async def on_connect ( ) :
self . logger . info ( f " Succesfully reconnected " )
self . connected = True
@client.event
async def on_disconnect ( ) :
self . connected = False
2024-04-28 17:18:06 +02:00
@client.slash_command ( name = ' sudo ' ,
description = ' Gives you the sudo role for a certain period of time. ' ,
dm_permission = False )
async def sudo_command ( inter : disnake . ApplicationCommandInteraction ,
password : str = commands . Param ( default = " " , name = " password " , description = " Your password. Leave empty if you do not have a password on this guild yet. " ) ,
2024-04-28 17:23:49 +02:00
duration : int = commands . Param ( default = 5 , name = " duration " , description = " The time you receive the administrator role for in minutes. " , gt = 1 , lt = 20 ) ) :
2024-04-28 17:18:06 +02:00
pw_bytes = password . encode ( ' utf-8 ' )
del password
pw_hash = self . data . get_user_password_hashsalt ( inter . guild_id , inter . user . id )
2024-04-28 17:41:43 +02:00
if pw_hash :
pw_hash_bytes = bytes . fromhex ( self . data . get_user_password_hashsalt ( inter . guild_id , inter . user . id ) )
2024-04-28 17:18:06 +02:00
if not pw_hash or bcrypt . checkpw ( pw_bytes , pw_hash_bytes ) :
sudoer_role = self . data . get_guild_sudoer_role ( inter . guild_id )
has_sudoer_role = any ( [ r . id == sudoer_role for r in inter . user . roles ] )
if not has_sudoer_role :
await inter . response . send_message ( embed = error_embed ( f " <@ { inter . user . id } > is not in the sudoers file. This incident will be reported. " ) , ephemeral = True )
return
sudo_role_id = self . data . get_guild_role ( inter . guild_id )
sudo_role = inter . guild . get_role ( sudo_role_id )
if not sudo_role :
2024-04-28 18:15:32 +02:00
await inter . response . send_message ( embed = error_embed ( " Misconfigured sudo role. Please contact an administrator. " ) , ephemeral = True )
2024-04-28 17:18:06 +02:00
return
2024-04-29 09:02:17 +02:00
try :
await inter . user . add_roles ( sudo_role )
except disnake . errors . Forbidden :
await inter . response . send_message ( embed = error_embed ( " I do not have permission to grant you the sudo role. Please contact an administrator. " ) , ephemeral = True )
return ;
except disnake . errors . NotFound :
await inter . response . send_message ( embed = error_embed ( " Could not find the sudo role. Please contact an administrator. " ) , ephemeral = True )
return ;
except :
await inter . response . send_message ( embed = error_embed ( " Unkown error when giving you the sudo role. Please contact an administrator. " ) , ephemeral = True )
return ;
2024-04-29 09:02:54 +02:00
del_time = int ( datetime . utcnow ( ) . timestamp ( ) ) + duration * 60
2024-04-28 17:18:06 +02:00
self . rq . add ( del_time , inter . user . id , inter . guild_id , sudo_role_id )
await inter . response . send_message ( embed = success_embed ( " You are now in sudo mode. " ) , ephemeral = True )
else :
await inter . response . send_message ( embed = error_embed ( " Incorrect password. " ) , ephemeral = True , delete_after = 4 )
@client.slash_command ( name = ' set_password ' ,
description = ' Set your password on this guild. DO NOT use the same password elsewhere. ' ,
dm_permission = False )
async def set_password_command ( inter : disnake . ApplicationCommandInteraction ,
2024-04-28 18:08:15 +02:00
password : str = commands . Param ( default = " " , name = " password " , description = " The password to set. " , min_length = 6 , max_length = 51 ) ) :
2024-04-28 17:18:06 +02:00
pw = self . data . get_user_password_hashsalt ( inter . guild_id , inter . user . id )
sudo_role = self . data . get_guild_role ( inter . guild_id )
has_sudo_role = any ( [ r . id == sudo_role for r in inter . user . roles ] )
if pw and not has_sudo_role :
await inter . response . send_message ( embed = warn_embed ( " Please enter sudo mode using `/sudo` to change your password. " ) , ephemeral = True )
return
if not password . strip ( ) :
self . data . set_user_password_hash ( inter . guild_id , inter . user . id , " " )
await inter . response . send_message ( embed = warn_embed ( " You have set an empty password. This is not recommended. " ) , ephemeral = True )
else :
pw_bytes = password . encode ( ' utf-8 ' )
del password
pw_hash = hash_salt_password ( pw_bytes )
self . data . set_user_password_hash ( inter . guild_id , inter . user . id , pw_hash )
await inter . response . send_message ( embed = success_embed ( " Successfully set your password. " ) , ephemeral = True )
@client.slash_command ( name = ' set_sudoers_role ' ,
description = ' Sets the sudoers role. This role allows the bearer to use `/sudo`. ' ,
dm_permission = False )
@commands.default_member_permissions ( administrator = True )
async def set_sudoers_role ( inter : disnake . ApplicationCommandInteraction ,
role : disnake . Role = commands . Param ( description = " The sudoers rule. " ) ) :
self . data . set_guild_sudoer_role ( inter . guild_id , role . id )
await inter . response . send_message ( embed = success_embed ( f " Set the sudoers role to ** { role . name } **. " ) )
@client.slash_command ( name = ' set_sudo_role ' ,
description = ' Sets the sudo role. This is the role someone is given then running `/sudo`. ' ,
dm_permission = False )
@commands.default_member_permissions ( administrator = True )
async def set_sudo_role_command ( inter : disnake . ApplicationCommandInteraction ,
role : disnake . Role = commands . Param ( description = " The role someone is given when running `/sudo`. " ) ) :
self . data . set_guild_role ( inter . guild_id , role . id )
await inter . response . send_message ( embed = success_embed ( f " Set the sudo role to ** { role . name } **. " ) )
async def wrap ( ) :
while True :
await asyncio . sleep ( 1 )
2024-05-12 19:27:16 +02:00
2024-04-28 17:18:06 +02:00
cur_time = int ( datetime . utcnow ( ) . timestamp ( ) )
while self . rq . get_min_time ( ) < = cur_time and self . rq . queue :
( _ , user , guild , role ) = self . rq . pop ( )
2024-05-12 19:27:16 +02:00
try :
guild = client . get_guild ( guild )
if not guild : continue
2024-04-28 17:18:06 +02:00
2024-05-12 19:27:16 +02:00
user = await guild . get_or_fetch_member ( user )
if not user : continue
2024-04-28 17:23:49 +02:00
2024-05-12 19:27:16 +02:00
role = user . get_role ( role )
if not role : continue
await user . remove_roles ( role )
except Exception as e :
self . logger . error ( " Could not remove role from user. Trying again in 10 seconds. The error is as follows: " )
self . logger . exception ( e )
self . rq . add ( ( cur_time + 10 , user , guild , role ) )
2024-04-28 17:18:06 +02:00
client . loop . create_task ( wrap ( ) )
client . run ( token )