|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re |
|
import time |
|
from datetime import datetime |
|
from os import remove |
|
|
|
from git import Repo |
|
from telethon import Button |
|
from telethon.tl.types import InputWebDocument, Message |
|
from telethon.utils import resolve_bot_file_id |
|
|
|
from pyUltroid._misc._assistant import callback, in_pattern |
|
from pyUltroid.dB._core import HELP, LIST |
|
from pyUltroid.fns.helper import gen_chlog, time_formatter, updater |
|
from pyUltroid.fns.misc import split_list |
|
|
|
from . import ( |
|
HNDLR, |
|
LOGS, |
|
OWNER_NAME, |
|
InlinePlugin, |
|
asst, |
|
get_string, |
|
inline_pic, |
|
split_list, |
|
start_time, |
|
udB, |
|
) |
|
from ._help import _main_help_menu |
|
|
|
|
|
|
|
helps = get_string("inline_1") |
|
|
|
add_ons = udB.get_key("ADDONS") |
|
|
|
zhelps = get_string("inline_3") if add_ons is False else get_string("inline_2") |
|
PLUGINS = HELP.get("Official", []) |
|
ADDONS = HELP.get("Addons", []) |
|
upage = 0 |
|
|
|
|
|
|
|
|
|
SUP_BUTTONS = [ |
|
[ |
|
Button.url("• Repo •", url="https://github.com/TeamUltroid/Ultroid"), |
|
Button.url("• Support •", url="t.me/UltroidSupportChat"), |
|
], |
|
] |
|
|
|
|
|
|
|
|
|
@in_pattern(owner=True, func=lambda x: not x.text) |
|
async def inline_alive(o): |
|
TLINK = inline_pic() or "https://graph.org/file/74d6259983e0642923fdb.jpg" |
|
MSG = "• **Ultroid Userbot •**" |
|
WEB0 = InputWebDocument( |
|
"https://graph.org/file/acd4f5d61369f74c5e7a7.jpg", 0, "image/jpg", [] |
|
) |
|
RES = [ |
|
await o.builder.article( |
|
type="photo", |
|
text=MSG, |
|
include_media=True, |
|
buttons=SUP_BUTTONS, |
|
title="Ultroid Userbot", |
|
description="Userbot | Telethon", |
|
url=TLINK, |
|
thumb=WEB0, |
|
content=InputWebDocument(TLINK, 0, "image/jpg", []), |
|
) |
|
] |
|
await o.answer( |
|
RES, |
|
private=True, |
|
cache_time=300, |
|
switch_pm="👥 ULTROID PORTAL", |
|
switch_pm_param="start", |
|
) |
|
|
|
|
|
@in_pattern("ultd", owner=True) |
|
async def inline_handler(event): |
|
z = [] |
|
for x in LIST.values(): |
|
z.extend(x) |
|
text = get_string("inline_4").format( |
|
OWNER_NAME, |
|
len(HELP.get("Official", [])), |
|
len(HELP.get("Addons", [])), |
|
len(z), |
|
) |
|
if inline_pic(): |
|
result = await event.builder.photo( |
|
file=inline_pic(), |
|
link_preview=False, |
|
text=text, |
|
buttons=_main_help_menu, |
|
) |
|
else: |
|
result = await event.builder.article( |
|
title="Ultroid Help Menu", text=text, buttons=_main_help_menu |
|
) |
|
await event.answer([result], private=True, cache_time=300, gallery=True) |
|
|
|
|
|
@in_pattern("pasta", owner=True) |
|
async def _(event): |
|
ok = event.text.split("-")[1] |
|
if not ok.startswith("http"): |
|
link = f"https://spaceb.in/{ok}" |
|
raw = f"https://spaceb.in/api/v1/documents/{ok}/raw" |
|
else: |
|
link = ok |
|
raw = f"{ok}/raw" |
|
result = await event.builder.article( |
|
title="Paste", |
|
text="Pasted to Spacebin 🌌", |
|
buttons=[ |
|
[ |
|
Button.url("SpaceBin", url=link), |
|
Button.url("Raw", url=raw), |
|
], |
|
], |
|
) |
|
await event.answer([result]) |
|
|
|
|
|
@callback("ownr", owner=True) |
|
async def setting(event): |
|
z = [] |
|
for x in LIST.values(): |
|
z.extend(x) |
|
await event.edit( |
|
get_string("inline_4").format( |
|
OWNER_NAME, |
|
len(HELP.get("Official", [])), |
|
len(HELP.get("Addons", [])), |
|
len(z), |
|
), |
|
file=inline_pic(), |
|
link_preview=False, |
|
buttons=[ |
|
[ |
|
Button.inline("•Pɪɴɢ•", data="pkng"), |
|
Button.inline("•Uᴘᴛɪᴍᴇ•", data="upp"), |
|
], |
|
[ |
|
Button.inline("•Stats•", data="alive"), |
|
Button.inline("•Uᴘᴅᴀᴛᴇ•", data="doupdate"), |
|
], |
|
[Button.inline("« Bᴀᴄᴋ", data="open")], |
|
], |
|
) |
|
|
|
|
|
_strings = {"Official": helps, "Addons": zhelps, "VCBot": get_string("inline_6")} |
|
|
|
|
|
@callback(re.compile("uh_(.*)"), owner=True) |
|
async def help_func(ult): |
|
key, count = ult.data_match.group(1).decode("utf-8").split("_") |
|
if key == "VCBot" and HELP.get("VCBot") is None: |
|
return await ult.answer(get_string("help_12"), alert=True) |
|
elif key == "Addons" and HELP.get("Addons") is None: |
|
return await ult.answer(get_string("help_13").format(HNDLR), alert=True) |
|
if "|" in count: |
|
_, count = count.split("|") |
|
count = int(count) if count else 0 |
|
text = _strings.get(key, "").format(OWNER_NAME, len(HELP.get(key))) |
|
await ult.edit(text, buttons=page_num(count, key), link_preview=False) |
|
|
|
|
|
@callback(re.compile("uplugin_(.*)"), owner=True) |
|
async def uptd_plugin(event): |
|
key, file = event.data_match.group(1).decode("utf-8").split("_") |
|
index = None |
|
if "|" in file: |
|
file, index = file.split("|") |
|
key_ = HELP.get(key, []) |
|
hel_p = f"Plugin Name - `{file}`\n" |
|
help_ = "" |
|
try: |
|
for i in key_[file]: |
|
help_ += i |
|
except BaseException: |
|
if file in LIST: |
|
help_ = get_string("help_11").format(file) |
|
for d in LIST[file]: |
|
help_ += HNDLR + d |
|
help_ += "\n" |
|
if not help_: |
|
help_ = f"{file} has no Detailed Help!" |
|
help_ += "\n© @TeamUltroid" |
|
buttons = [] |
|
if inline_pic(): |
|
data = f"sndplug_{key}_{file}" |
|
if index is not None: |
|
data += f"|{index}" |
|
buttons.append( |
|
[ |
|
Button.inline( |
|
"« Sᴇɴᴅ Pʟᴜɢɪɴ »", |
|
data=data, |
|
) |
|
] |
|
) |
|
data = f"uh_{key}_" |
|
if index is not None: |
|
data += f"|{index}" |
|
buttons.append( |
|
[ |
|
Button.inline("« Bᴀᴄᴋ", data=data), |
|
] |
|
) |
|
try: |
|
await event.edit(help_, buttons=buttons) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
help = f"Do `{HNDLR}help {key}` to get list of commands." |
|
await event.edit(help, buttons=buttons) |
|
|
|
|
|
@callback(data="doupdate", owner=True) |
|
async def _(event): |
|
if not await updater(): |
|
return await event.answer(get_string("inline_9"), cache_time=0, alert=True) |
|
if not inline_pic(): |
|
return await event.answer(f"Do '{HNDLR}update' to update..") |
|
repo = Repo.init() |
|
changelog, tl_chnglog = await gen_chlog( |
|
repo, f"HEAD..upstream/{repo.active_branch}" |
|
) |
|
changelog_str = changelog + "\n\n" + get_string("inline_8") |
|
if len(changelog_str) > 1024: |
|
await event.edit(get_string("upd_4")) |
|
with open("ultroid_updates.txt", "w+") as file: |
|
file.write(tl_chnglog) |
|
await event.edit( |
|
get_string("upd_5"), |
|
file="ultroid_updates.txt", |
|
buttons=[ |
|
[Button.inline("• Uᴘᴅᴀᴛᴇ Nᴏᴡ •", data="updatenow")], |
|
[Button.inline("« Bᴀᴄᴋ", data="ownr")], |
|
], |
|
) |
|
remove("ultroid_updates.txt") |
|
else: |
|
await event.edit( |
|
changelog_str, |
|
buttons=[ |
|
[Button.inline("Update Now", data="updatenow")], |
|
[Button.inline("« Bᴀᴄᴋ", data="ownr")], |
|
], |
|
parse_mode="html", |
|
) |
|
|
|
|
|
@callback(data="pkng", owner=True) |
|
async def _(event): |
|
start = datetime.now() |
|
end = datetime.now() |
|
ms = (end - start).microseconds |
|
pin = f"🌋Pɪɴɢ = {ms} microseconds" |
|
await event.answer(pin, cache_time=0, alert=True) |
|
|
|
|
|
@callback(data="upp", owner=True) |
|
async def _(event): |
|
uptime = time_formatter((time.time() - start_time) * 1000) |
|
pin = f"🙋Uᴘᴛɪᴍᴇ = {uptime}" |
|
await event.answer(pin, cache_time=0, alert=True) |
|
|
|
|
|
@callback(data="inlone", owner=True) |
|
async def _(e): |
|
_InButtons = [ |
|
Button.switch_inline(_, query=InlinePlugin[_], same_peer=True) |
|
for _ in list(InlinePlugin.keys()) |
|
] |
|
InButtons = split_list(_InButtons, 2) |
|
|
|
button = InButtons.copy() |
|
button.append( |
|
[ |
|
Button.inline("« Bᴀᴄᴋ", data="open"), |
|
], |
|
) |
|
await e.edit(buttons=button, link_preview=False) |
|
|
|
|
|
@callback(data="open", owner=True) |
|
async def opner(event): |
|
z = [] |
|
for x in LIST.values(): |
|
z.extend(x) |
|
await event.edit( |
|
get_string("inline_4").format( |
|
OWNER_NAME, |
|
len(HELP.get("Official", [])), |
|
len(HELP.get("Addons", [])), |
|
len(z), |
|
), |
|
buttons=_main_help_menu, |
|
link_preview=False, |
|
) |
|
|
|
|
|
@callback(data="close", owner=True) |
|
async def on_plug_in_callback_query_handler(event): |
|
await event.edit( |
|
get_string("inline_5"), |
|
buttons=Button.inline("Oᴘᴇɴ Aɢᴀɪɴ", data="open"), |
|
) |
|
|
|
|
|
def page_num(index, key): |
|
rows = udB.get_key("HELP_ROWS") or 5 |
|
cols = udB.get_key("HELP_COLUMNS") or 2 |
|
loaded = HELP.get(key, []) |
|
emoji = udB.get_key("EMOJI_IN_HELP") or "✘" |
|
List = [ |
|
Button.inline(f"{emoji} {x} {emoji}", data=f"uplugin_{key}_{x}|{index}") |
|
for x in sorted(loaded) |
|
] |
|
all_ = split_list(List, cols) |
|
fl_ = split_list(all_, rows) |
|
try: |
|
new_ = fl_[index] |
|
except IndexError: |
|
new_ = fl_[0] if fl_ else [] |
|
index = 0 |
|
if index == 0 and len(fl_) == 1: |
|
new_.append([Button.inline("« Bᴀᴄᴋ »", data="open")]) |
|
else: |
|
new_.append( |
|
[ |
|
Button.inline( |
|
"« Pʀᴇᴠɪᴏᴜs", |
|
data=f"uh_{key}_{index-1}", |
|
), |
|
Button.inline("« Bᴀᴄᴋ »", data="open"), |
|
Button.inline( |
|
"Nᴇxᴛ »", |
|
data=f"uh_{key}_{index+1}", |
|
), |
|
] |
|
) |
|
return new_ |
|
|
|
|
|
|
|
|
|
|
|
STUFF = {} |
|
|
|
|
|
@in_pattern("stf(.*)", owner=True) |
|
async def ibuild(e): |
|
n = e.pattern_match.group(1).strip() |
|
builder = e.builder |
|
if not (n and n.isdigit()): |
|
return |
|
ok = STUFF.get(int(n)) |
|
txt = ok.get("msg") |
|
pic = ok.get("media") |
|
btn = ok.get("button") |
|
if not (pic or txt): |
|
txt = "Hey!" |
|
if pic: |
|
try: |
|
include_media = True |
|
mime_type, _pic = None, None |
|
cont, results = None, None |
|
try: |
|
ext = str(pic).split(".")[-1].lower() |
|
except BaseException: |
|
ext = None |
|
if ext in ["img", "jpg", "png"]: |
|
_type = "photo" |
|
mime_type = "image/jpg" |
|
elif ext in ["mp4", "mkv", "gif"]: |
|
mime_type = "video/mp4" |
|
_type = "gif" |
|
else: |
|
try: |
|
if "telethon.tl.types" in str(type(pic)): |
|
_pic = pic |
|
else: |
|
_pic = resolve_bot_file_id(pic) |
|
except BaseException: |
|
pass |
|
if _pic: |
|
results = [ |
|
await builder.document( |
|
_pic, |
|
title="Ultroid Op", |
|
text=txt, |
|
description="@TeamUltroid", |
|
buttons=btn, |
|
link_preview=False, |
|
) |
|
] |
|
else: |
|
_type = "article" |
|
include_media = False |
|
if not results: |
|
if include_media: |
|
cont = InputWebDocument(pic, 0, mime_type, []) |
|
results = [ |
|
await builder.article( |
|
title="Ultroid Op", |
|
type=_type, |
|
text=txt, |
|
description="@TeamUltroid", |
|
include_media=include_media, |
|
buttons=btn, |
|
thumb=cont, |
|
content=cont, |
|
link_preview=False, |
|
) |
|
] |
|
return await e.answer(results) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
result = [ |
|
await builder.article("Ultroid Op", text=txt, link_preview=False, buttons=btn) |
|
] |
|
await e.answer(result) |
|
|
|
|
|
async def something(e, msg, media, button, reply=True, chat=None): |
|
if e.client._bot: |
|
return await e.reply(msg, file=media, buttons=button) |
|
num = len(STUFF) + 1 |
|
STUFF.update({num: {"msg": msg, "media": media, "button": button}}) |
|
try: |
|
res = await e.client.inline_query(asst.me.username, f"stf{num}") |
|
return await res[0].click( |
|
chat or e.chat_id, |
|
reply_to=bool(isinstance(e, Message) and reply), |
|
hide_via=True, |
|
silent=True, |
|
) |
|
|
|
except Exception as er: |
|
LOGS.exception(er) |
|
|