# Ultroid - UserBot # Copyright (C) 2021-2025 TeamUltroid # # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > # PLease read the GNU Affero General Public License in # . import asyncio import inspect import re import sys from io import BytesIO from pathlib import Path from time import gmtime, strftime from traceback import format_exc from telethon import Button from telethon import __version__ as telever from telethon import events from telethon.errors.common import AlreadyInConversationError from telethon.errors.rpcerrorlist import ( AuthKeyDuplicatedError, BotInlineDisabledError, BotMethodInvalidError, ChatSendInlineForbiddenError, ChatSendMediaForbiddenError, ChatSendStickersForbiddenError, FloodWaitError, MessageDeleteForbiddenError, MessageIdInvalidError, MessageNotModifiedError, UserIsBotError, ) from telethon.events import MessageEdited, NewMessage from telethon.utils import get_display_name from pyUltroid.exceptions import DependencyMissingError from strings import get_string from .. import * from .. import _ignore_eval from ..dB import DEVLIST from ..dB._core import LIST, LOADED from ..fns.admins import admin_check from ..fns.helper import bash from ..fns.helper import time_formatter as tf from ..version import __version__ as pyver from ..version import ultroid_version as ult_ver from . import SUDO_M, owner_and_sudos from ._wrappers import eod MANAGER = udB.get_key("MANAGER") TAKE_EDITS = udB.get_key("TAKE_EDITS") black_list_chats = udB.get_key("BLACKLIST_CHATS") allow_sudo = SUDO_M.should_allow_sudo def compile_pattern(data, hndlr): if data.startswith("^"): data = data[1:] if data.startswith("."): data = data[1:] if hndlr in [" ", "NO_HNDLR"]: # No Hndlr Feature return re.compile("^" + data) return re.compile("\\" + hndlr + data) def ultroid_cmd( pattern=None, manager=False, ultroid_bot=ultroid_bot, asst=asst, **kwargs ): owner_only = kwargs.get("owner_only", False) groups_only = kwargs.get("groups_only", False) admins_only = kwargs.get("admins_only", False) fullsudo = kwargs.get("fullsudo", False) only_devs = kwargs.get("only_devs", False) func = kwargs.get("func", lambda e: not e.via_bot_id) def decor(dec): async def wrapp(ult): if not ult.out: if owner_only: return if ult.sender_id not in owner_and_sudos(): return if ult.sender_id in _ignore_eval: return await eod( ult, get_string("py_d1"), ) if fullsudo and ult.sender_id not in SUDO_M.fullsudos: return await eod(ult, get_string("py_d2"), time=15) chat = ult.chat if hasattr(chat, "title"): if ( "#noub" in chat.title.lower() and not (chat.admin_rights or chat.creator) and not (ult.sender_id in DEVLIST) ): return if ult.is_private and (groups_only or admins_only): return await eod(ult, get_string("py_d3")) elif admins_only and not (chat.admin_rights or chat.creator): return await eod(ult, get_string("py_d5")) if only_devs and not udB.get_key("I_DEV"): return await eod( ult, get_string("py_d4").format(HNDLR), time=10, ) try: await dec(ult) except FloodWaitError as fwerr: await asst.send_message( udB.get_key("LOG_CHANNEL"), f"`FloodWaitError:\n{str(fwerr)}\n\nSleeping for {tf((fwerr.seconds + 10)*1000)}`", ) await ultroid_bot.disconnect() await asyncio.sleep(fwerr.seconds + 10) await ultroid_bot.connect() await asst.send_message( udB.get_key("LOG_CHANNEL"), "`Bot is working again`", ) return except ChatSendInlineForbiddenError: return await eod(ult, "`Inline Locked In This Chat.`") except (ChatSendMediaForbiddenError, ChatSendStickersForbiddenError): return await eod(ult, get_string("py_d8")) except (BotMethodInvalidError, UserIsBotError): return await eod(ult, get_string("py_d6")) except AlreadyInConversationError: return await eod( ult, get_string("py_d7"), ) except (BotInlineDisabledError, DependencyMissingError) as er: return await eod(ult, f"`{er}`") except ( MessageIdInvalidError, MessageNotModifiedError, MessageDeleteForbiddenError, ) as er: LOGS.exception(er) except AuthKeyDuplicatedError as er: LOGS.exception(er) await asst.send_message( udB.get_key("LOG_CHANNEL"), "Session String expired, create new session from 👇", buttons=[ Button.url("Bot", "t.me/SessionGeneratorBot?start="), Button.url( "Repl", "https://replit.com/@TheUltroid/UltroidStringSession", ), ], ) sys.exit() except events.StopPropagation: raise events.StopPropagation except KeyboardInterrupt: pass except Exception as e: LOGS.exception(e) date = strftime("%Y-%m-%d %H:%M:%S", gmtime()) naam = get_display_name(chat) ftext = "**Ultroid Client Error:** `Forward this to` @UltroidSupportChat\n\n" ftext += "**Py-Ultroid Version:** `" + str(pyver) ftext += "`\n**Ultroid Version:** `" + str(ult_ver) ftext += "`\n**Telethon Version:** `" + str(telever) ftext += f"`\n**Hosted At:** `{HOSTED_ON}`\n\n" ftext += "--------START ULTROID CRASH LOG--------" ftext += "\n**Date:** `" + date ftext += "`\n**Group:** `" + str(ult.chat_id) + "` " + str(naam) ftext += "\n**Sender ID:** `" + str(ult.sender_id) ftext += "`\n**Replied:** `" + str(ult.is_reply) ftext += "`\n\n**Event Trigger:**`\n" ftext += str(ult.text) ftext += "`\n\n**Traceback info:**`\n" ftext += str(format_exc()) ftext += "`\n\n**Error text:**`\n" ftext += str(sys.exc_info()[1]) ftext += "`\n\n--------END ULTROID CRASH LOG--------" ftext += "\n\n\n**Last 5 commits:**`\n" stdout, stderr = await bash('git log --pretty=format:"%an: %s" -5') result = stdout + (stderr or "") ftext += f"{result}`" if len(ftext) > 4096: with BytesIO(ftext.encode()) as file: file.name = "logs.txt" error_log = await asst.send_file( udB.get_key("LOG_CHANNEL"), file, caption="**Ultroid Client Error:** `Forward this to` @UltroidSupportChat\n\n", ) else: error_log = await asst.send_message( udB.get_key("LOG_CHANNEL"), ftext, ) if ult.out: await ult.edit( f"[An error occurred]", link_preview=False, parse_mode="html", ) cmd = None blacklist_chats = False chats = None if black_list_chats: blacklist_chats = True chats = list(black_list_chats) _add_new = allow_sudo and HNDLR != SUDO_HNDLR if _add_new: if pattern: cmd = compile_pattern(pattern, SUDO_HNDLR) ultroid_bot.add_event_handler( wrapp, NewMessage( pattern=cmd, incoming=True, forwards=False, func=func, chats=chats, blacklist_chats=blacklist_chats, ), ) if pattern: cmd = compile_pattern(pattern, HNDLR) ultroid_bot.add_event_handler( wrapp, NewMessage( outgoing=True if _add_new else None, pattern=cmd, forwards=False, func=func, chats=chats, blacklist_chats=blacklist_chats, ), ) if TAKE_EDITS: def func_(x): return not x.via_bot_id and not (x.is_channel and x.chat.broadcast) ultroid_bot.add_event_handler( wrapp, MessageEdited( pattern=cmd, forwards=False, func=func_, chats=chats, blacklist_chats=blacklist_chats, ), ) if manager and MANAGER: allow_all = kwargs.get("allow_all", False) allow_pm = kwargs.get("allow_pm", False) require = kwargs.get("require", None) async def manager_cmd(ult): if not allow_all and not (await admin_check(ult, require=require)): return if not allow_pm and ult.is_private: return try: await dec(ult) except Exception as er: if chat := udB.get_key("MANAGER_LOG"): text = f"**#MANAGER_LOG\n\nChat:** `{get_display_name(ult.chat)}` `{ult.chat_id}`" text += f"\n**Replied :** `{ult.is_reply}`\n**Command :** {ult.text}\n\n**Error :** `{er}`" try: return await asst.send_message( chat, text, link_preview=False ) except Exception as er: LOGS.exception(er) LOGS.info(f"• MANAGER [{ult.chat_id}]:") LOGS.exception(er) if pattern: cmd = compile_pattern(pattern, "/") asst.add_event_handler( manager_cmd, NewMessage( pattern=cmd, forwards=False, incoming=True, func=func, chats=chats, blacklist_chats=blacklist_chats, ), ) if DUAL_MODE and not (manager and DUAL_HNDLR == "/"): if pattern: cmd = compile_pattern(pattern, DUAL_HNDLR) asst.add_event_handler( wrapp, NewMessage( pattern=cmd, incoming=True, forwards=False, func=func, chats=chats, blacklist_chats=blacklist_chats, ), ) file = Path(inspect.stack()[1].filename) if "addons/" in str(file): if LOADED.get(file.stem): LOADED[file.stem].append(wrapp) else: LOADED.update({file.stem: [wrapp]}) if pattern: if LIST.get(file.stem): LIST[file.stem].append(pattern) else: LIST.update({file.stem: [pattern]}) return wrapp return decor