|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
import inspect |
|
import re |
|
import sys |
|
from io import BytesIO |
|
from pathlib import Path |
|
from time import gmtime, strftime |
|
from traceback import format_exc |
|
|
|
from telethon import Button |
|
from telethon import __version__ as telever |
|
from telethon import events |
|
from telethon.errors.common import AlreadyInConversationError |
|
from telethon.errors.rpcerrorlist import ( |
|
AuthKeyDuplicatedError, |
|
BotInlineDisabledError, |
|
BotMethodInvalidError, |
|
ChatSendInlineForbiddenError, |
|
ChatSendMediaForbiddenError, |
|
ChatSendStickersForbiddenError, |
|
FloodWaitError, |
|
MessageDeleteForbiddenError, |
|
MessageIdInvalidError, |
|
MessageNotModifiedError, |
|
UserIsBotError, |
|
) |
|
from telethon.events import MessageEdited, NewMessage |
|
from telethon.utils import get_display_name |
|
|
|
from pyUltroid.exceptions import DependencyMissingError |
|
from strings import get_string |
|
|
|
from .. import * |
|
from .. import _ignore_eval |
|
from ..dB import DEVLIST |
|
from ..dB._core import LIST, LOADED |
|
from ..fns.admins import admin_check |
|
from ..fns.helper import bash |
|
from ..fns.helper import time_formatter as tf |
|
from ..version import __version__ as pyver |
|
from ..version import ultroid_version as ult_ver |
|
from . import SUDO_M, owner_and_sudos |
|
from ._wrappers import eod |
|
|
|
MANAGER = udB.get_key("MANAGER") |
|
TAKE_EDITS = udB.get_key("TAKE_EDITS") |
|
black_list_chats = udB.get_key("BLACKLIST_CHATS") |
|
allow_sudo = SUDO_M.should_allow_sudo |
|
|
|
|
|
def compile_pattern(data, hndlr): |
|
if data.startswith("^"): |
|
data = data[1:] |
|
if data.startswith("."): |
|
data = data[1:] |
|
if hndlr in [" ", "NO_HNDLR"]: |
|
|
|
return re.compile("^" + data) |
|
return re.compile("\\" + hndlr + data) |
|
|
|
|
|
def ultroid_cmd( |
|
pattern=None, manager=False, ultroid_bot=ultroid_bot, asst=asst, **kwargs |
|
): |
|
owner_only = kwargs.get("owner_only", False) |
|
groups_only = kwargs.get("groups_only", False) |
|
admins_only = kwargs.get("admins_only", False) |
|
fullsudo = kwargs.get("fullsudo", False) |
|
only_devs = kwargs.get("only_devs", False) |
|
func = kwargs.get("func", lambda e: not e.via_bot_id) |
|
|
|
def decor(dec): |
|
async def wrapp(ult): |
|
if not ult.out: |
|
if owner_only: |
|
return |
|
if ult.sender_id not in owner_and_sudos(): |
|
return |
|
if ult.sender_id in _ignore_eval: |
|
return await eod( |
|
ult, |
|
get_string("py_d1"), |
|
) |
|
if fullsudo and ult.sender_id not in SUDO_M.fullsudos: |
|
return await eod(ult, get_string("py_d2"), time=15) |
|
chat = ult.chat |
|
if hasattr(chat, "title"): |
|
if ( |
|
"#noub" in chat.title.lower() |
|
and not (chat.admin_rights or chat.creator) |
|
and not (ult.sender_id in DEVLIST) |
|
): |
|
return |
|
if ult.is_private and (groups_only or admins_only): |
|
return await eod(ult, get_string("py_d3")) |
|
elif admins_only and not (chat.admin_rights or chat.creator): |
|
return await eod(ult, get_string("py_d5")) |
|
if only_devs and not udB.get_key("I_DEV"): |
|
return await eod( |
|
ult, |
|
get_string("py_d4").format(HNDLR), |
|
time=10, |
|
) |
|
try: |
|
await dec(ult) |
|
except FloodWaitError as fwerr: |
|
await asst.send_message( |
|
udB.get_key("LOG_CHANNEL"), |
|
f"`FloodWaitError:\n{str(fwerr)}\n\nSleeping for {tf((fwerr.seconds + 10)*1000)}`", |
|
) |
|
await ultroid_bot.disconnect() |
|
await asyncio.sleep(fwerr.seconds + 10) |
|
await ultroid_bot.connect() |
|
await asst.send_message( |
|
udB.get_key("LOG_CHANNEL"), |
|
"`Bot is working again`", |
|
) |
|
return |
|
except ChatSendInlineForbiddenError: |
|
return await eod(ult, "`Inline Locked In This Chat.`") |
|
except (ChatSendMediaForbiddenError, ChatSendStickersForbiddenError): |
|
return await eod(ult, get_string("py_d8")) |
|
except (BotMethodInvalidError, UserIsBotError): |
|
return await eod(ult, get_string("py_d6")) |
|
except AlreadyInConversationError: |
|
return await eod( |
|
ult, |
|
get_string("py_d7"), |
|
) |
|
except (BotInlineDisabledError, DependencyMissingError) as er: |
|
return await eod(ult, f"`{er}`") |
|
except ( |
|
MessageIdInvalidError, |
|
MessageNotModifiedError, |
|
MessageDeleteForbiddenError, |
|
) as er: |
|
LOGS.exception(er) |
|
except AuthKeyDuplicatedError as er: |
|
LOGS.exception(er) |
|
await asst.send_message( |
|
udB.get_key("LOG_CHANNEL"), |
|
"Session String expired, create new session from π", |
|
buttons=[ |
|
Button.url("Bot", "t.me/SessionGeneratorBot?start="), |
|
Button.url( |
|
"Repl", |
|
"https://replit.com/@TheUltroid/UltroidStringSession", |
|
), |
|
], |
|
) |
|
sys.exit() |
|
except events.StopPropagation: |
|
raise events.StopPropagation |
|
except KeyboardInterrupt: |
|
pass |
|
except Exception as e: |
|
LOGS.exception(e) |
|
date = strftime("%Y-%m-%d %H:%M:%S", gmtime()) |
|
naam = get_display_name(chat) |
|
ftext = "**Ultroid Client Error:** `Forward this to` @UltroidSupportChat\n\n" |
|
ftext += "**Py-Ultroid Version:** `" + str(pyver) |
|
ftext += "`\n**Ultroid Version:** `" + str(ult_ver) |
|
ftext += "`\n**Telethon Version:** `" + str(telever) |
|
ftext += f"`\n**Hosted At:** `{HOSTED_ON}`\n\n" |
|
ftext += "--------START ULTROID CRASH LOG--------" |
|
ftext += "\n**Date:** `" + date |
|
ftext += "`\n**Group:** `" + str(ult.chat_id) + "` " + str(naam) |
|
ftext += "\n**Sender ID:** `" + str(ult.sender_id) |
|
ftext += "`\n**Replied:** `" + str(ult.is_reply) |
|
ftext += "`\n\n**Event Trigger:**`\n" |
|
ftext += str(ult.text) |
|
ftext += "`\n\n**Traceback info:**`\n" |
|
ftext += str(format_exc()) |
|
ftext += "`\n\n**Error text:**`\n" |
|
ftext += str(sys.exc_info()[1]) |
|
ftext += "`\n\n--------END ULTROID CRASH LOG--------" |
|
ftext += "\n\n\n**Last 5 commits:**`\n" |
|
|
|
stdout, stderr = await bash('git log --pretty=format:"%an: %s" -5') |
|
result = stdout + (stderr or "") |
|
|
|
ftext += f"{result}`" |
|
|
|
if len(ftext) > 4096: |
|
with BytesIO(ftext.encode()) as file: |
|
file.name = "logs.txt" |
|
error_log = await asst.send_file( |
|
udB.get_key("LOG_CHANNEL"), |
|
file, |
|
caption="**Ultroid Client Error:** `Forward this to` @UltroidSupportChat\n\n", |
|
) |
|
else: |
|
error_log = await asst.send_message( |
|
udB.get_key("LOG_CHANNEL"), |
|
ftext, |
|
) |
|
if ult.out: |
|
await ult.edit( |
|
f"<b><a href={error_log.message_link}>[An error occurred]</a></b>", |
|
link_preview=False, |
|
parse_mode="html", |
|
) |
|
|
|
cmd = None |
|
blacklist_chats = False |
|
chats = None |
|
if black_list_chats: |
|
blacklist_chats = True |
|
chats = list(black_list_chats) |
|
_add_new = allow_sudo and HNDLR != SUDO_HNDLR |
|
if _add_new: |
|
if pattern: |
|
cmd = compile_pattern(pattern, SUDO_HNDLR) |
|
ultroid_bot.add_event_handler( |
|
wrapp, |
|
NewMessage( |
|
pattern=cmd, |
|
incoming=True, |
|
forwards=False, |
|
func=func, |
|
chats=chats, |
|
blacklist_chats=blacklist_chats, |
|
), |
|
) |
|
if pattern: |
|
cmd = compile_pattern(pattern, HNDLR) |
|
ultroid_bot.add_event_handler( |
|
wrapp, |
|
NewMessage( |
|
outgoing=True if _add_new else None, |
|
pattern=cmd, |
|
forwards=False, |
|
func=func, |
|
chats=chats, |
|
blacklist_chats=blacklist_chats, |
|
), |
|
) |
|
if TAKE_EDITS: |
|
|
|
def func_(x): |
|
return not x.via_bot_id and not (x.is_channel and x.chat.broadcast) |
|
|
|
ultroid_bot.add_event_handler( |
|
wrapp, |
|
MessageEdited( |
|
pattern=cmd, |
|
forwards=False, |
|
func=func_, |
|
chats=chats, |
|
blacklist_chats=blacklist_chats, |
|
), |
|
) |
|
if manager and MANAGER: |
|
allow_all = kwargs.get("allow_all", False) |
|
allow_pm = kwargs.get("allow_pm", False) |
|
require = kwargs.get("require", None) |
|
|
|
async def manager_cmd(ult): |
|
if not allow_all and not (await admin_check(ult, require=require)): |
|
return |
|
if not allow_pm and ult.is_private: |
|
return |
|
try: |
|
await dec(ult) |
|
except Exception as er: |
|
if chat := udB.get_key("MANAGER_LOG"): |
|
text = f"**#MANAGER_LOG\n\nChat:** `{get_display_name(ult.chat)}` `{ult.chat_id}`" |
|
text += f"\n**Replied :** `{ult.is_reply}`\n**Command :** {ult.text}\n\n**Error :** `{er}`" |
|
try: |
|
return await asst.send_message( |
|
chat, text, link_preview=False |
|
) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
LOGS.info(f"β’ MANAGER [{ult.chat_id}]:") |
|
LOGS.exception(er) |
|
|
|
if pattern: |
|
cmd = compile_pattern(pattern, "/") |
|
asst.add_event_handler( |
|
manager_cmd, |
|
NewMessage( |
|
pattern=cmd, |
|
forwards=False, |
|
incoming=True, |
|
func=func, |
|
chats=chats, |
|
blacklist_chats=blacklist_chats, |
|
), |
|
) |
|
if DUAL_MODE and not (manager and DUAL_HNDLR == "/"): |
|
if pattern: |
|
cmd = compile_pattern(pattern, DUAL_HNDLR) |
|
asst.add_event_handler( |
|
wrapp, |
|
NewMessage( |
|
pattern=cmd, |
|
incoming=True, |
|
forwards=False, |
|
func=func, |
|
chats=chats, |
|
blacklist_chats=blacklist_chats, |
|
), |
|
) |
|
file = Path(inspect.stack()[1].filename) |
|
if "addons/" in str(file): |
|
if LOADED.get(file.stem): |
|
LOADED[file.stem].append(wrapp) |
|
else: |
|
LOADED.update({file.stem: [wrapp]}) |
|
if pattern: |
|
if LIST.get(file.stem): |
|
LIST[file.stem].append(pattern) |
|
else: |
|
LIST.update({file.stem: [pattern]}) |
|
return wrapp |
|
|
|
return decor |
|
|