File size: 5,242 Bytes
618430a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# Ultroid - UserBot
# Copyright (C) 2021-2025 TeamUltroid
#
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ >
# PLease read the GNU Affero General Public License in
# <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>.
import asyncio
import time
import uuid
from telethon import Button
from telethon.errors.rpcerrorlist import UserNotParticipantError
from telethon.tl import functions, types
try:
from .. import _ult_cache
from .._misc import SUDO_M
except ImportError:
_ult_cache = {}
SUDO_M = None
def ban_time(time_str):
"""Simplify ban time from text"""
if not any(time_str.endswith(unit) for unit in ("s", "m", "h", "d")):
time_str += "s"
unit = time_str[-1]
time_int = time_str[:-1].strip()
if not time_int.isdigit():
raise Exception("Invalid time amount specified.")
to_return = ""
if unit == "s":
to_return = int(time.time() + int(time_int))
elif unit == "m":
to_return = int(time.time() + int(time_int) * 60)
elif unit == "h":
to_return = int(time.time() + int(time_int) * 60 * 60)
elif unit == "d":
to_return = int(time.time() + int(time_int) * 24 * 60 * 60)
return to_return
# ------------------Admin Check--------------- #
async def _callback_check(event):
id_ = str(uuid.uuid1()).split("-")[0]
time.time()
msg = await event.reply(
"Click Below Button to prove self as Admin!",
buttons=Button.inline("Click Me", f"cc_{id_}"),
)
if not _ult_cache.get("admin_callback"):
_ult_cache.update({"admin_callback": {id_: None}})
else:
_ult_cache["admin_callback"].update({id_: None})
while not _ult_cache["admin_callback"].get(id_):
await asyncio.sleep(1)
key = _ult_cache.get("admin_callback", {}).get(id_)
del _ult_cache["admin_callback"][id_]
return key
async def get_update_linked_chat(event):
if _ult_cache.get("LINKED_CHATS") and _ult_cache["LINKED_CHATS"].get(event.chat_id):
_ignore = _ult_cache["LINKED_CHATS"][event.chat_id]["linked_chat"]
else:
channel = await event.client(
functions.channels.GetFullChannelRequest(event.chat_id)
)
_ignore = channel.full_chat.linked_chat_id
if _ult_cache.get("LINKED_CHATS"):
_ult_cache["LINKED_CHATS"].update({event.chat_id: {"linked_chat": _ignore}})
else:
_ult_cache.update(
{"LINKED_CHATS": {event.chat_id: {"linked_chat": _ignore}}}
)
return _ignore
async def admin_check(event, require=None, silent: bool = False):
if SUDO_M and event.sender_id in SUDO_M.owner_and_sudos():
return True
callback = None
# for Anonymous Admin Support
if (
isinstance(event.sender, (types.Chat, types.Channel))
and event.sender_id == event.chat_id
):
if not require:
return True
callback = True
if isinstance(event.sender, types.Channel):
_ignore = await get_update_linked_chat(event)
if _ignore and event.sender.id == _ignore:
return False
callback = True
if callback:
if silent:
# work silently, same check is used for antiflood
# and should not ask for Button Verification.
return
get_ = await _callback_check(event)
if not get_:
return
user, perms = get_
event._sender_id = user.id
event._sender = user
else:
user = event.sender
try:
perms = await event.client.get_permissions(event.chat_id, user.id)
except UserNotParticipantError:
if not silent:
await event.reply("You need to join this chat First!")
return False
if not perms.is_admin:
if not silent:
await event.eor("Only Admins can use this command!", time=8)
return
if require and not getattr(perms, require, False):
if not silent:
await event.eor(f"You are missing the right of `{require}`", time=8)
return False
return True
# ------------------Lock Unlock----------------
def lock_unlock(query, lock=True):
"""
`Used in locks plugin`
Is there any better way to do this?
"""
rights = types.ChatBannedRights(None)
_do = lock
if query == "msgs":
for i in ["send_messages", "invite_users", "pin_messages" "change_info"]:
setattr(rights, i, _do)
elif query == "media":
setattr(rights, "send_media", _do)
elif query == "sticker":
setattr(rights, "send_stickers", _do)
elif query == "gif":
setattr(rights, "send_gifs", _do)
elif query == "games":
setattr(rights, "send_games", _do)
elif query == "inline":
setattr(rights, "send_inline", _do)
elif query == "polls":
setattr(rights, "send_polls", _do)
elif query == "invites":
setattr(rights, "invite_users", _do)
elif query == "pin":
setattr(rights, "pin_messages", _do)
elif query == "changeinfo":
setattr(rights, "change_info", _do)
else:
return None
return rights
# ---------------- END ---------------- #
|