|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
import time |
|
import uuid |
|
|
|
from telethon import Button |
|
from telethon.errors.rpcerrorlist import UserNotParticipantError |
|
from telethon.tl import functions, types |
|
|
|
try: |
|
from .. import _ult_cache |
|
from .._misc import SUDO_M |
|
except ImportError: |
|
_ult_cache = {} |
|
SUDO_M = None |
|
|
|
|
|
def ban_time(time_str): |
|
"""Simplify ban time from text""" |
|
if not any(time_str.endswith(unit) for unit in ("s", "m", "h", "d")): |
|
time_str += "s" |
|
unit = time_str[-1] |
|
time_int = time_str[:-1].strip() |
|
if not time_int.isdigit(): |
|
raise Exception("Invalid time amount specified.") |
|
to_return = "" |
|
if unit == "s": |
|
to_return = int(time.time() + int(time_int)) |
|
elif unit == "m": |
|
to_return = int(time.time() + int(time_int) * 60) |
|
elif unit == "h": |
|
to_return = int(time.time() + int(time_int) * 60 * 60) |
|
elif unit == "d": |
|
to_return = int(time.time() + int(time_int) * 24 * 60 * 60) |
|
return to_return |
|
|
|
|
|
|
|
|
|
|
|
async def _callback_check(event): |
|
id_ = str(uuid.uuid1()).split("-")[0] |
|
time.time() |
|
msg = await event.reply( |
|
"Click Below Button to prove self as Admin!", |
|
buttons=Button.inline("Click Me", f"cc_{id_}"), |
|
) |
|
if not _ult_cache.get("admin_callback"): |
|
_ult_cache.update({"admin_callback": {id_: None}}) |
|
else: |
|
_ult_cache["admin_callback"].update({id_: None}) |
|
while not _ult_cache["admin_callback"].get(id_): |
|
await asyncio.sleep(1) |
|
key = _ult_cache.get("admin_callback", {}).get(id_) |
|
del _ult_cache["admin_callback"][id_] |
|
return key |
|
|
|
|
|
async def get_update_linked_chat(event): |
|
if _ult_cache.get("LINKED_CHATS") and _ult_cache["LINKED_CHATS"].get(event.chat_id): |
|
_ignore = _ult_cache["LINKED_CHATS"][event.chat_id]["linked_chat"] |
|
else: |
|
channel = await event.client( |
|
functions.channels.GetFullChannelRequest(event.chat_id) |
|
) |
|
_ignore = channel.full_chat.linked_chat_id |
|
if _ult_cache.get("LINKED_CHATS"): |
|
_ult_cache["LINKED_CHATS"].update({event.chat_id: {"linked_chat": _ignore}}) |
|
else: |
|
_ult_cache.update( |
|
{"LINKED_CHATS": {event.chat_id: {"linked_chat": _ignore}}} |
|
) |
|
return _ignore |
|
|
|
|
|
async def admin_check(event, require=None, silent: bool = False): |
|
if SUDO_M and event.sender_id in SUDO_M.owner_and_sudos(): |
|
return True |
|
callback = None |
|
|
|
|
|
if ( |
|
isinstance(event.sender, (types.Chat, types.Channel)) |
|
and event.sender_id == event.chat_id |
|
): |
|
if not require: |
|
return True |
|
callback = True |
|
if isinstance(event.sender, types.Channel): |
|
_ignore = await get_update_linked_chat(event) |
|
if _ignore and event.sender.id == _ignore: |
|
return False |
|
callback = True |
|
if callback: |
|
if silent: |
|
|
|
|
|
return |
|
get_ = await _callback_check(event) |
|
if not get_: |
|
return |
|
user, perms = get_ |
|
event._sender_id = user.id |
|
event._sender = user |
|
else: |
|
user = event.sender |
|
try: |
|
perms = await event.client.get_permissions(event.chat_id, user.id) |
|
except UserNotParticipantError: |
|
if not silent: |
|
await event.reply("You need to join this chat First!") |
|
return False |
|
if not perms.is_admin: |
|
if not silent: |
|
await event.eor("Only Admins can use this command!", time=8) |
|
return |
|
if require and not getattr(perms, require, False): |
|
if not silent: |
|
await event.eor(f"You are missing the right of `{require}`", time=8) |
|
return False |
|
return True |
|
|
|
|
|
|
|
|
|
|
|
def lock_unlock(query, lock=True): |
|
""" |
|
`Used in locks plugin` |
|
Is there any better way to do this? |
|
""" |
|
rights = types.ChatBannedRights(None) |
|
_do = lock |
|
if query == "msgs": |
|
for i in ["send_messages", "invite_users", "pin_messages" "change_info"]: |
|
setattr(rights, i, _do) |
|
elif query == "media": |
|
setattr(rights, "send_media", _do) |
|
elif query == "sticker": |
|
setattr(rights, "send_stickers", _do) |
|
elif query == "gif": |
|
setattr(rights, "send_gifs", _do) |
|
elif query == "games": |
|
setattr(rights, "send_games", _do) |
|
elif query == "inline": |
|
setattr(rights, "send_inline", _do) |
|
elif query == "polls": |
|
setattr(rights, "send_polls", _do) |
|
elif query == "invites": |
|
setattr(rights, "invite_users", _do) |
|
elif query == "pin": |
|
setattr(rights, "pin_messages", _do) |
|
elif query == "changeinfo": |
|
setattr(rights, "change_info", _do) |
|
else: |
|
return None |
|
return rights |
|
|
|
|
|
|
|
|