|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
import os |
|
import random |
|
import shutil |
|
import time |
|
from random import randint |
|
|
|
from ..configs import Var |
|
|
|
try: |
|
from pytz import timezone |
|
except ImportError: |
|
timezone = None |
|
|
|
from telethon.errors import ( |
|
ChannelsTooMuchError, |
|
ChatAdminRequiredError, |
|
MessageIdInvalidError, |
|
MessageNotModifiedError, |
|
UserNotParticipantError, |
|
) |
|
from telethon.tl.custom import Button |
|
from telethon.tl.functions.channels import ( |
|
CreateChannelRequest, |
|
EditAdminRequest, |
|
EditPhotoRequest, |
|
InviteToChannelRequest, |
|
) |
|
from telethon.tl.functions.contacts import UnblockRequest |
|
from telethon.tl.types import ( |
|
ChatAdminRights, |
|
ChatPhotoEmpty, |
|
InputChatUploadedPhoto, |
|
InputMessagesFilterDocument, |
|
) |
|
from telethon.utils import get_peer_id |
|
from decouple import config, RepositoryEnv |
|
from .. import LOGS, ULTConfig |
|
from ..fns.helper import download_file, inline_mention, updater |
|
|
|
db_url = 0 |
|
|
|
|
|
async def autoupdate_local_database(): |
|
from .. import Var, asst, udB, ultroid_bot |
|
|
|
global db_url |
|
db_url = ( |
|
udB.get_key("TGDB_URL") or Var.TGDB_URL or ultroid_bot._cache.get("TGDB_URL") |
|
) |
|
if db_url: |
|
_split = db_url.split("/") |
|
_channel = _split[-2] |
|
_id = _split[-1] |
|
try: |
|
await asst.edit_message( |
|
int(_channel) if _channel.isdigit() else _channel, |
|
message=_id, |
|
file="database.json", |
|
text="**Do not delete this file.**", |
|
) |
|
except MessageNotModifiedError: |
|
return |
|
except MessageIdInvalidError: |
|
pass |
|
try: |
|
LOG_CHANNEL = ( |
|
udB.get_key("LOG_CHANNEL") |
|
or Var.LOG_CHANNEL |
|
or asst._cache.get("LOG_CHANNEL") |
|
or "me" |
|
) |
|
msg = await asst.send_message( |
|
LOG_CHANNEL, "**Do not delete this file.**", file="database.json" |
|
) |
|
asst._cache["TGDB_URL"] = msg.message_link |
|
udB.set_key("TGDB_URL", msg.message_link) |
|
except Exception as ex: |
|
LOGS.error(f"Error on autoupdate_local_database: {ex}") |
|
|
|
|
|
def update_envs(): |
|
"""Update Var. attributes to udB""" |
|
from .. import udB |
|
_envs = [*list(os.environ)] |
|
if ".env" in os.listdir("."): |
|
[_envs.append(_) for _ in list(RepositoryEnv(config._find_file(".")).data)] |
|
for envs in _envs: |
|
if ( |
|
envs in ["LOG_CHANNEL", "BOT_TOKEN", "BOTMODE", "DUAL_MODE", "language"] |
|
or envs in udB.keys() |
|
): |
|
if _value := os.environ.get(envs): |
|
udB.set_key(envs, _value) |
|
else: |
|
udB.set_key(envs, config.config.get(envs)) |
|
|
|
|
|
async def startup_stuff(): |
|
from .. import udB |
|
|
|
x = ["resources/auth", "resources/downloads"] |
|
for x in x: |
|
if not os.path.isdir(x): |
|
os.mkdir(x) |
|
|
|
CT = udB.get_key("CUSTOM_THUMBNAIL") |
|
if CT: |
|
path = "resources/extras/thumbnail.jpg" |
|
ULTConfig.thumb = path |
|
try: |
|
await download_file(CT, path) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
elif CT is False: |
|
ULTConfig.thumb = None |
|
GT = udB.get_key("GDRIVE_AUTH_TOKEN") |
|
if GT: |
|
with open("resources/auth/gdrive_creds.json", "w") as t_file: |
|
t_file.write(GT) |
|
|
|
if udB.get_key("AUTH_TOKEN"): |
|
udB.del_key("AUTH_TOKEN") |
|
|
|
MM = udB.get_key("MEGA_MAIL") |
|
MP = udB.get_key("MEGA_PASS") |
|
if MM and MP: |
|
with open(".megarc", "w") as mega: |
|
mega.write(f"[Login]\nUsername = {MM}\nPassword = {MP}") |
|
|
|
TZ = udB.get_key("TIMEZONE") |
|
if TZ and timezone: |
|
try: |
|
timezone(TZ) |
|
os.environ["TZ"] = TZ |
|
time.tzset() |
|
except AttributeError as er: |
|
LOGS.debug(er) |
|
except BaseException: |
|
LOGS.critical( |
|
"Incorrect Timezone ,\nCheck Available Timezone From Here https://graph.org/Ultroid-06-18-2\nSo Time is Default UTC" |
|
) |
|
os.environ["TZ"] = "UTC" |
|
time.tzset() |
|
|
|
|
|
async def autobot(): |
|
from .. import udB, ultroid_bot |
|
|
|
if udB.get_key("BOT_TOKEN"): |
|
return |
|
await ultroid_bot.start() |
|
LOGS.info("MAKING A TELEGRAM BOT FOR YOU AT @BotFather, Kindly Wait") |
|
who = ultroid_bot.me |
|
name = who.first_name + "'s Bot" |
|
if who.username: |
|
username = who.username + "_bot" |
|
else: |
|
username = "ultroid_" + (str(who.id))[5:] + "_bot" |
|
bf = "@BotFather" |
|
await ultroid_bot(UnblockRequest(bf)) |
|
await ultroid_bot.send_message(bf, "/cancel") |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message(bf, "/newbot") |
|
await asyncio.sleep(1) |
|
isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
|
if isdone.startswith("That I cannot do.") or "20 bots" in isdone: |
|
LOGS.critical( |
|
"Please make a Bot from @BotFather and add it's token in BOT_TOKEN, as an env var and restart me." |
|
) |
|
import sys |
|
|
|
sys.exit(1) |
|
await ultroid_bot.send_message(bf, name) |
|
await asyncio.sleep(1) |
|
isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
|
if not isdone.startswith("Good."): |
|
await ultroid_bot.send_message(bf, "My Assistant Bot") |
|
await asyncio.sleep(1) |
|
isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
|
if not isdone.startswith("Good."): |
|
LOGS.critical( |
|
"Please make a Bot from @BotFather and add it's token in BOT_TOKEN, as an env var and restart me." |
|
) |
|
import sys |
|
|
|
sys.exit(1) |
|
await ultroid_bot.send_message(bf, username) |
|
await asyncio.sleep(1) |
|
isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
|
await ultroid_bot.send_read_acknowledge("botfather") |
|
if isdone.startswith("Sorry,"): |
|
ran = randint(1, 100) |
|
username = "ultroid_" + (str(who.id))[6:] + str(ran) + "_bot" |
|
await ultroid_bot.send_message(bf, username) |
|
await asyncio.sleep(1) |
|
isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
|
if isdone.startswith("Done!"): |
|
token = isdone.split("`")[1] |
|
udB.set_key("BOT_TOKEN", token) |
|
await enable_inline(ultroid_bot, username) |
|
LOGS.info( |
|
f"Done. Successfully created @{username} to be used as your assistant bot!" |
|
) |
|
else: |
|
LOGS.info( |
|
"Please Delete Some Of your Telegram bots at @Botfather or Set Var BOT_TOKEN with token of a bot" |
|
) |
|
|
|
import sys |
|
|
|
sys.exit(1) |
|
|
|
|
|
async def autopilot(): |
|
from .. import asst, udB, ultroid_bot |
|
|
|
channel = udB.get_key("LOG_CHANNEL") |
|
new_channel = None |
|
if channel: |
|
try: |
|
chat = await ultroid_bot.get_entity(channel) |
|
except BaseException as err: |
|
LOGS.exception(err) |
|
udB.del_key("LOG_CHANNEL") |
|
channel = None |
|
if not channel: |
|
|
|
async def _save(exc): |
|
udB._cache["LOG_CHANNEL"] = ultroid_bot.me.id |
|
await asst.send_message( |
|
ultroid_bot.me.id, f"Failed to Create Log Channel due to {exc}.." |
|
) |
|
|
|
if ultroid_bot._bot: |
|
msg_ = "'LOG_CHANNEL' not found! Add it in order to use 'BOTMODE'" |
|
LOGS.error(msg_) |
|
return await _save(msg_) |
|
LOGS.info("Creating a Log Channel for You!") |
|
try: |
|
r = await ultroid_bot( |
|
CreateChannelRequest( |
|
title="My Ultroid Logs", |
|
about="My Ultroid Log Group\n\n Join @TeamUltroid", |
|
megagroup=True, |
|
), |
|
) |
|
except ChannelsTooMuchError as er: |
|
LOGS.critical( |
|
"You Are in Too Many Channels & Groups , Leave some And Restart The Bot" |
|
) |
|
return await _save(str(er)) |
|
except BaseException as er: |
|
LOGS.exception(er) |
|
LOGS.info( |
|
"Something Went Wrong , Create A Group and set its id on config var LOG_CHANNEL." |
|
) |
|
|
|
return await _save(str(er)) |
|
new_channel = True |
|
chat = r.chats[0] |
|
channel = get_peer_id(chat) |
|
udB.set_key("LOG_CHANNEL", channel) |
|
assistant = True |
|
try: |
|
await ultroid_bot.get_permissions(int(channel), asst.me.username) |
|
except UserNotParticipantError: |
|
try: |
|
await ultroid_bot(InviteToChannelRequest(int(channel), [asst.me.username])) |
|
except BaseException as er: |
|
LOGS.info("Error while Adding Assistant to Log Channel") |
|
LOGS.exception(er) |
|
assistant = False |
|
except BaseException as er: |
|
assistant = False |
|
LOGS.exception(er) |
|
if assistant and new_channel: |
|
try: |
|
achat = await asst.get_entity(int(channel)) |
|
except BaseException as er: |
|
achat = None |
|
LOGS.info("Error while getting Log channel from Assistant") |
|
LOGS.exception(er) |
|
if achat and not achat.admin_rights: |
|
rights = ChatAdminRights( |
|
add_admins=True, |
|
invite_users=True, |
|
change_info=True, |
|
ban_users=True, |
|
delete_messages=True, |
|
pin_messages=True, |
|
anonymous=False, |
|
manage_call=True, |
|
) |
|
try: |
|
await ultroid_bot( |
|
EditAdminRequest( |
|
int(channel), asst.me.username, rights, "Assistant" |
|
) |
|
) |
|
except ChatAdminRequiredError: |
|
LOGS.info( |
|
"Failed to promote 'Assistant Bot' in 'Log Channel' due to 'Admin Privileges'" |
|
) |
|
except BaseException as er: |
|
LOGS.info("Error while promoting assistant in Log Channel..") |
|
LOGS.exception(er) |
|
if isinstance(chat.photo, ChatPhotoEmpty): |
|
photo, _ = await download_file( |
|
"https://graph.org/file/27c6812becf6f376cbb10.jpg", "channelphoto.jpg" |
|
) |
|
ll = await ultroid_bot.upload_file(photo) |
|
try: |
|
await ultroid_bot( |
|
EditPhotoRequest(int(channel), InputChatUploadedPhoto(ll)) |
|
) |
|
except BaseException as er: |
|
LOGS.exception(er) |
|
os.remove(photo) |
|
|
|
|
|
|
|
|
|
|
|
async def customize(): |
|
from .. import asst, udB, ultroid_bot |
|
|
|
rem = None |
|
try: |
|
chat_id = udB.get_key("LOG_CHANNEL") |
|
if asst.me.photo: |
|
return |
|
LOGS.info("Customising Ur Assistant Bot in @BOTFATHER") |
|
UL = f"@{asst.me.username}" |
|
if not ultroid_bot.me.username: |
|
sir = ultroid_bot.me.first_name |
|
else: |
|
sir = f"@{ultroid_bot.me.username}" |
|
file = random.choice( |
|
[ |
|
"https://graph.org/file/92cd6dbd34b0d1d73a0da.jpg", |
|
"https://graph.org/file/a97973ee0425b523cdc28.jpg", |
|
"resources/extras/ultroid_assistant.jpg", |
|
] |
|
) |
|
if not os.path.exists(file): |
|
file, _ = await download_file(file, "profile.jpg") |
|
rem = True |
|
msg = await asst.send_message( |
|
chat_id, "**Auto Customisation** Started on @Botfather" |
|
) |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message("botfather", "/cancel") |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message("botfather", "/setuserpic") |
|
await asyncio.sleep(1) |
|
isdone = (await ultroid_bot.get_messages("botfather", limit=1))[0].text |
|
if isdone.startswith("Invalid bot"): |
|
LOGS.info("Error while trying to customise assistant, skipping...") |
|
return |
|
await ultroid_bot.send_message("botfather", UL) |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_file("botfather", file) |
|
await asyncio.sleep(2) |
|
await ultroid_bot.send_message("botfather", "/setabouttext") |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message("botfather", UL) |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message( |
|
"botfather", f"β¨ Hello β¨!! I'm Assistant Bot of {sir}" |
|
) |
|
await asyncio.sleep(2) |
|
await ultroid_bot.send_message("botfather", "/setdescription") |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message("botfather", UL) |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message( |
|
"botfather", |
|
f"β¨ Powerful Ultroid Assistant Bot β¨\nβ¨ Master ~ {sir} β¨\n\nβ¨ Powered By ~ @TeamUltroid β¨", |
|
) |
|
await asyncio.sleep(2) |
|
await msg.edit("Completed **Auto Customisation** at @BotFather.") |
|
if rem: |
|
os.remove(file) |
|
LOGS.info("Customisation Done") |
|
except Exception as e: |
|
LOGS.exception(e) |
|
|
|
|
|
async def plug(plugin_channels): |
|
from .. import ultroid_bot |
|
from .utils import load_addons |
|
|
|
if ultroid_bot._bot: |
|
LOGS.info("Plugin Channels can't be used in 'BOTMODE'") |
|
return |
|
if os.path.exists("addons") and not os.path.exists("addons/.git"): |
|
shutil.rmtree("addons") |
|
if not os.path.exists("addons"): |
|
os.mkdir("addons") |
|
if not os.path.exists("addons/__init__.py"): |
|
with open("addons/__init__.py", "w") as f: |
|
f.write("from plugins import *\n\nbot = ultroid_bot") |
|
LOGS.info("β’ Loading Plugins from Plugin Channel(s) β’") |
|
for chat in plugin_channels: |
|
LOGS.info(f"{'β’'*4} {chat}") |
|
try: |
|
async for x in ultroid_bot.iter_messages( |
|
chat, search=".py", filter=InputMessagesFilterDocument, wait_time=10 |
|
): |
|
plugin = "addons/" + x.file.name.replace("_", "-").replace("|", "-") |
|
if not os.path.exists(plugin): |
|
await asyncio.sleep(0.6) |
|
if x.text == "#IGNORE": |
|
continue |
|
plugin = await x.download_media(plugin) |
|
try: |
|
load_addons(plugin) |
|
except Exception as e: |
|
LOGS.info(f"Ultroid - PLUGIN_CHANNEL - ERROR - {plugin}") |
|
LOGS.exception(e) |
|
os.remove(plugin) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
|
|
|
|
|
|
|
|
|
|
async def fetch_ann(): |
|
from .. import asst, udB |
|
from ..fns.tools import async_searcher |
|
|
|
get_ = udB.get_key("OLDANN") or [] |
|
chat_id = udB.get_key("LOG_CHANNEL") |
|
try: |
|
updts = await async_searcher( |
|
"https://ultroid-api.vercel.app/announcements", post=True, re_json=True |
|
) |
|
for upt in updts: |
|
key = list(upt.keys())[0] |
|
if key not in get_: |
|
cont = upt[key] |
|
if isinstance(cont, dict) and cont.get("lang"): |
|
if cont["lang"] != (udB.get_key("language") or "en"): |
|
continue |
|
cont = cont["msg"] |
|
if isinstance(cont, str): |
|
await asst.send_message(chat_id, cont) |
|
elif isinstance(cont, dict) and cont.get("chat"): |
|
await asst.forward_messages(chat_id, cont["msg_id"], cont["chat"]) |
|
else: |
|
LOGS.info(cont) |
|
LOGS.info( |
|
"Invalid Type of Announcement Detected!\nMake sure you are on latest version.." |
|
) |
|
get_.append(key) |
|
udB.set_key("OLDANN", get_) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
|
|
|
|
async def ready(): |
|
from .. import asst, udB, ultroid_bot |
|
|
|
chat_id = udB.get_key("LOG_CHANNEL") |
|
spam_sent = None |
|
if not udB.get_key("INIT_DEPLOY"): |
|
MSG = """π **Thanks for Deploying Ultroid Userbot!** |
|
β’ Here, are the Some Basic stuff from, where you can Know, about its Usage.""" |
|
PHOTO = "https://graph.org/file/54a917cc9dbb94733ea5f.jpg" |
|
BTTS = Button.inline("β’ Click to Start β’", "initft_2") |
|
udB.set_key("INIT_DEPLOY", "Done") |
|
else: |
|
MSG = f"**Ultroid has been deployed!**\nββββββββββ\n**UserMode**: {inline_mention(ultroid_bot.me)}\n**Assistant**: @{asst.me.username}\nββββββββββ\n**Support**: @TeamUltroid\nββββββββββ" |
|
BTTS, PHOTO = None, None |
|
prev_spam = udB.get_key("LAST_UPDATE_LOG_SPAM") |
|
if prev_spam: |
|
try: |
|
await ultroid_bot.delete_messages(chat_id, int(prev_spam)) |
|
except Exception as E: |
|
LOGS.info("Error while Deleting Previous Update Message :" + str(E)) |
|
if await updater(): |
|
BTTS = Button.inline("Update Available", "updtavail") |
|
|
|
try: |
|
spam_sent = await asst.send_message(chat_id, MSG, file=PHOTO, buttons=BTTS) |
|
except ValueError as e: |
|
try: |
|
await (await ultroid_bot.send_message(chat_id, str(e))).delete() |
|
spam_sent = await asst.send_message(chat_id, MSG, file=PHOTO, buttons=BTTS) |
|
except Exception as g: |
|
LOGS.info(g) |
|
except Exception as el: |
|
LOGS.info(el) |
|
try: |
|
spam_sent = await ultroid_bot.send_message(chat_id, MSG) |
|
except Exception as ef: |
|
LOGS.exception(ef) |
|
if spam_sent and not spam_sent.media: |
|
udB.set_key("LAST_UPDATE_LOG_SPAM", spam_sent.id) |
|
|
|
|
|
|
|
async def WasItRestart(udb): |
|
key = udb.get_key("_RESTART") |
|
if not key: |
|
return |
|
from .. import asst, ultroid_bot |
|
|
|
try: |
|
data = key.split("_") |
|
who = asst if data[0] == "bot" else ultroid_bot |
|
await who.edit_message( |
|
int(data[1]), int(data[2]), "__Restarted Successfully.__" |
|
) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
udb.del_key("_RESTART") |
|
|
|
|
|
def _version_changes(udb): |
|
for _ in [ |
|
"BOT_USERS", |
|
"BOT_BLS", |
|
"VC_SUDOS", |
|
"SUDOS", |
|
"CLEANCHAT", |
|
"LOGUSERS", |
|
"PLUGIN_CHANNEL", |
|
"CH_SOURCE", |
|
"CH_DESTINATION", |
|
"BROADCAST", |
|
]: |
|
key = udb.get_key(_) |
|
if key and str(key)[0] != "[": |
|
key = udb.get(_) |
|
new_ = [ |
|
int(z) if z.isdigit() or (z.startswith("-") and z[1:].isdigit()) else z |
|
for z in key.split() |
|
] |
|
udb.set_key(_, new_) |
|
|
|
|
|
async def enable_inline(ultroid_bot, username): |
|
bf = "BotFather" |
|
await ultroid_bot.send_message(bf, "/setinline") |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message(bf, f"@{username}") |
|
await asyncio.sleep(1) |
|
await ultroid_bot.send_message(bf, "Search") |
|
await ultroid_bot.send_read_acknowledge(bf) |
|
|