bash / assistant /callbackstuffs.py
azils3's picture
Upload 216 files
618430a verified
# Ultroid - UserBot
# Copyright (C) 2021-2025 TeamUltroid
#
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ >
# PLease read the GNU Affero General Public License in
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>.
import ast
import asyncio
import re
import sys
import time
from asyncio.exceptions import TimeoutError as AsyncTimeOut
from os import execl, remove
from random import choice
from bs4 import BeautifulSoup as bs
try:
from pyUltroid.fns.gDrive import GDriveManager
except ImportError:
GDriveManager = None
from telethon import Button, events
from catbox import CatboxUploader
from telethon.tl.types import MessageMediaWebPage
from telethon.utils import get_peer_id
from pyUltroid.fns.helper import fast_download, progress
from pyUltroid.fns.tools import Carbon, async_searcher, get_paste, telegraph_client
from pyUltroid.startup.loader import Loader
from . import *
# --------------------------------------------------------------------#
telegraph = telegraph_client()
GDrive = GDriveManager() if GDriveManager else None
uploader = CatboxUploader()
# --------------------------------------------------------------------#
def text_to_url(event):
"""function to get media url (with|without) Webpage"""
if isinstance(event.media, MessageMediaWebPage):
webpage = event.media.webpage
if not isinstance(webpage, types.WebPageEmpty) and webpage.type in ["photo"]:
return webpage.display_url
return event.text
# --------------------------------------------------------------------#
_buttons = {
"otvars": {
"text": "Other Variables to set for @TeamUltroid:",
"buttons": [
[
Button.inline("Tᴀɢ Lᴏɢɢᴇʀ", data="taglog"),
Button.inline("SᴜᴘᴇʀFʙᴀɴ", data="cbs_sfban"),
],
[
Button.inline("Sᴜᴅᴏ Mᴏᴅᴇ", data="sudo"),
Button.inline("Hᴀɴᴅʟᴇʀ", data="hhndlr"),
],
[
Button.inline("Exᴛʀᴀ Pʟᴜɢɪɴs", data="plg"),
Button.inline("Aᴅᴅᴏɴs", data="eaddon"),
],
[
Button.inline("Eᴍᴏᴊɪ ɪɴ Hᴇʟᴘ", data="emoj"),
Button.inline("Sᴇᴛ ɢDʀɪᴠᴇ", data="gdrive"),
],
[
Button.inline("Iɴʟɪɴᴇ Pɪᴄ", data="inli_pic"),
Button.inline("Sᴜᴅᴏ HNDLR", data="shndlr"),
],
[Button.inline("Dᴜᴀʟ Mᴏᴅᴇ", "cbs_oofdm")],
[Button.inline("« Bᴀᴄᴋ", data="setter")],
],
},
"sfban": {
"text": "SuperFban Settings:",
"buttons": [
[Button.inline("FBᴀɴ Gʀᴏᴜᴘ", data="sfgrp")],
[Button.inline("Exᴄʟᴜᴅᴇ Fᴇᴅs", data="abs_sfexf")],
[Button.inline("« Bᴀᴄᴋ", data="cbs_otvars")],
],
},
"apauto": {
"text": "This'll auto approve on outgoing messages",
"buttons": [
[Button.inline("Aᴜᴛᴏ Aᴘᴘʀᴏᴠᴇ ON", data="apon")],
[Button.inline("Aᴜᴛᴏ Aᴘᴘʀᴏᴠᴇ OFF", data="apof")],
[Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")],
],
},
"alvcstm": {
"text": f"Customise your {HNDLR}alive. Choose from the below options -",
"buttons": [
[Button.inline("Aʟɪᴠᴇ Tᴇxᴛ", data="abs_alvtx")],
[Button.inline("Aʟɪᴠᴇ ᴍᴇᴅɪᴀ", data="alvmed")],
[Button.inline("Dᴇʟᴇᴛᴇ Aʟɪᴠᴇ Mᴇᴅɪᴀ", data="delmed")],
[Button.inline("« Bᴀᴄᴋ", data="setter")],
],
},
"pmcstm": {
"text": "Customise your PMPERMIT Settings -",
"buttons": [
[
Button.inline("Pᴍ Tᴇxᴛ", data="pmtxt"),
Button.inline("Pᴍ Mᴇᴅɪᴀ", data="pmmed"),
],
[
Button.inline("Aᴜᴛᴏ Aᴘᴘʀᴏᴠᴇ", data="cbs_apauto"),
Button.inline("PMLOGGER", data="pml"),
],
[
Button.inline("Sᴇᴛ Wᴀʀɴs", data="swarn"),
Button.inline("Dᴇʟᴇᴛᴇ Pᴍ Mᴇᴅɪᴀ", data="delpmmed"),
],
[Button.inline("PMPermit Type", data="cbs_pmtype")],
[Button.inline("« Bᴀᴄᴋ", data="cbs_ppmset")],
],
},
"pmtype": {
"text": "Select the type of PMPermit needed.",
"buttons": [
[Button.inline("Inline", data="inpm_in")],
[Button.inline("Normal", data="inpm_no")],
[Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")],
],
},
"ppmset": {
"text": "PMPermit Settings:",
"buttons": [
[Button.inline("Tᴜʀɴ PMPᴇʀᴍɪᴛ Oɴ", data="pmon")],
[Button.inline("Tᴜʀɴ PMPᴇʀᴍɪᴛ Oғғ", data="pmoff")],
[Button.inline("Cᴜsᴛᴏᴍɪᴢᴇ PMPᴇʀᴍɪᴛ", data="cbs_pmcstm")],
[Button.inline("« Bᴀᴄᴋ", data="setter")],
],
},
"chatbot": {
"text": "From This Feature U can chat with ppls Via ur Assistant Bot.\n[More info](https://t.me/UltroidUpdates/2)",
"buttons": [
[
Button.inline("Cʜᴀᴛ Bᴏᴛ Oɴ", data="onchbot"),
Button.inline("Cʜᴀᴛ Bᴏᴛ Oғғ", data="ofchbot"),
],
[
Button.inline("Bᴏᴛ Wᴇʟᴄᴏᴍᴇ", data="bwel"),
Button.inline("Bᴏᴛ Wᴇʟᴄᴏᴍᴇ Mᴇᴅɪᴀ", data="botmew"),
],
[Button.inline("Bᴏᴛ Iɴғᴏ Tᴇxᴛ", data="botinfe")],
[Button.inline("Fᴏʀᴄᴇ Sᴜʙsᴄʀɪʙᴇ", data="pmfs")],
[Button.inline("« Bᴀᴄᴋ", data="setter")],
],
},
"vcb": {
"text": "From This Feature U can play songs in group voice chat\n\n[moreinfo](https://t.me/UltroidUpdates/4)",
"buttons": [
[Button.inline("VC Sᴇssɪᴏɴ", data="abs_vcs")],
[Button.inline("« Bᴀᴄᴋ", data="setter")],
],
},
"oofdm": {
"text": "About [Dual Mode](https://t.me/UltroidUpdates/18)",
"buttons": [
[
Button.inline("Dᴜᴀʟ Mᴏᴅᴇ Oɴ", "dmof"),
Button.inline("Dᴜᴀʟ Mᴏᴅᴇ Oғғ", "dmof"),
],
[Button.inline("Dᴜᴀʟ Mᴏᴅᴇ Hɴᴅʟʀ", "dmhn")],
[Button.inline("« Back", data="cbs_otvars")],
],
},
"apiset": {
"text": get_string("ast_1"),
"buttons": [
[Button.inline("Remove.bg API", data="abs_rmbg")],
[Button.inline("DEEP API", data="abs_dapi")],
[Button.inline("OCR API", data="abs_oapi")],
[Button.inline("« Back", data="setter")],
],
},
}
_convo = {
"rmbg": {
"var": "RMBG_API",
"name": "Remove.bg API Key",
"text": get_string("ast_2"),
"back": "cbs_apiset",
},
"dapi": {
"var": "DEEP_AI",
"name": "Deep AI Api Key",
"text": "Get Your Deep Api from deepai.org and send here.",
"back": "cbs_apiset",
},
"oapi": {
"var": "OCR_API",
"name": "Ocr Api Key",
"text": "Get Your OCR api from ocr.space and send that Here.",
"back": "cbs_apiset",
},
"pmlgg": {
"var": "PMLOGGROUP",
"name": "Pm Log Group",
"text": "Send chat id of chat which you want to save as Pm log Group.",
"back": "pml",
},
"vcs": {
"var": "VC_SESSION",
"name": "Vc Session",
"text": "**Vc session**\nEnter the New session u generated for vc bot.\n\nUse /cancel to terminate the operation.",
"back": "cbs_vcb",
},
"settag": {
"var": "TAG_LOG",
"name": "Tag Log Group",
"text": f"Make a group, add your assistant and make it admin.\nGet the `{HNDLR}id` of that group and send it here for tag logs.\n\nUse /cancel to cancel.",
"back": "taglog",
},
"alvtx": {
"var": "ALIVE_TEXT",
"name": "Alive Text",
"text": "**Alive Text**\nEnter the new alive text.\n\nUse /cancel to terminate the operation.",
"back": "cbs_alvcstm",
},
"sfexf": {
"var": "EXCLUDE_FED",
"name": "Excluded Fed",
"text": "Send the Fed IDs you want to exclude in the ban. Split by a space.\neg`id1 id2 id3`\nSet is as `None` if you dont want any.\nUse /cancel to go back.",
"back": "cbs_sfban",
},
}
TOKEN_FILE = "resources/auths/auth_token.txt"
@callback(
re.compile(
"sndplug_(.*)",
),
owner=True,
)
async def send(eve):
key, name = (eve.data_match.group(1)).decode("UTF-8").split("_")
thumb = "resources/extras/inline.jpg"
await eve.answer("■ Sending ■")
data = f"uh_{key}_"
index = None
if "|" in name:
name, index = name.split("|")
key = "plugins" if key == "Official" else key.lower()
plugin = f"{key}/{name}.py"
_ = f"pasta-{plugin}"
if index is not None:
data += f"|{index}"
_ += f"|{index}"
buttons = [
[
Button.inline(
"« Pᴀsᴛᴇ »",
data=_,
)
],
[
Button.inline("« Bᴀᴄᴋ", data=data),
],
]
try:
await eve.edit(file=plugin, thumb=thumb, buttons=buttons)
except Exception as er:
await eve.answer(str(er), alert=True)
heroku_api, app_name = Var.HEROKU_API, Var.HEROKU_APP_NAME
@callback("updatenow", owner=True)
async def update(eve):
repo = Repo()
ac_br = repo.active_branch
ups_rem = repo.remote("upstream")
if heroku_api:
import heroku3
try:
heroku = heroku3.from_key(heroku_api)
heroku_app = None
heroku_applications = heroku.apps()
except BaseException as er:
LOGS.exception(er)
return await eve.edit("`Wrong HEROKU_API.`")
for app in heroku_applications:
if app.name == app_name:
heroku_app = app
if not heroku_app:
await eve.edit("`Wrong HEROKU_APP_NAME.`")
repo.__del__()
return
await eve.edit(get_string("clst_1"))
ups_rem.fetch(ac_br)
repo.git.reset("--hard", "FETCH_HEAD")
heroku_git_url = heroku_app.git_url.replace(
"https://", f"https://api:{heroku_api}@"
)
if "heroku" in repo.remotes:
remote = repo.remote("heroku")
remote.set_url(heroku_git_url)
else:
remote = repo.create_remote("heroku", heroku_git_url)
try:
remote.push(refspec=f"HEAD:refs/heads/{ac_br}", force=True)
except GitCommandError as error:
await eve.edit(f"`Here is the error log:\n{error}`")
repo.__del__()
return
await eve.edit("`Successfully Updated!\nRestarting, please wait...`")
else:
await eve.edit(get_string("clst_1"))
call_back()
await bash("git pull && pip3 install -r requirements.txt")
await bash("pip3 install -r requirements.txt --break-system-packages")
execl(sys.executable, sys.executable, "-m", "pyUltroid")
@callback(re.compile("changes(.*)"), owner=True)
async def changes(okk):
match = okk.data_match.group(1).decode("utf-8")
await okk.answer(get_string("clst_3"))
repo = Repo.init()
button = [[Button.inline("Update Now", data="updatenow")]]
changelog, tl_chnglog = await gen_chlog(
repo, f"HEAD..upstream/{repo.active_branch}"
)
cli = "\n\nClick the below button to update!"
if not match:
try:
if len(tl_chnglog) > 700:
tl_chnglog = f"{tl_chnglog[:700]}..."
button.append([Button.inline("View Complete", "changesall")])
await okk.edit("• Writing Changelogs 📝 •")
img = await Carbon(
file_name="changelog",
code=tl_chnglog,
backgroundColor=choice(ATRA_COL),
language="md",
)
return await okk.edit(
f"**• Ultroid Userbot •**{cli}", file=img, buttons=button
)
except Exception as er:
LOGS.exception(er)
changelog_str = changelog + cli
if len(changelog_str) > 1024:
await okk.edit(get_string("upd_4"))
await asyncio.sleep(2)
with open("ultroid_updates.txt", "w+") as file:
file.write(tl_chnglog)
await okk.edit(
get_string("upd_5"),
file="ultroid_updates.txt",
buttons=button,
)
remove("ultroid_updates.txt")
return
await okk.edit(
changelog_str,
buttons=button,
parse_mode="html",
)
@callback(
re.compile(
"pasta-(.*)",
),
owner=True,
)
async def _(e):
ok = (e.data_match.group(1)).decode("UTF-8")
index = None
if "|" in ok:
ok, index = ok.split("|")
with open(ok, "r") as hmm:
_, data = await get_paste(hmm.read())
if not data.get("link"):
return await e.answer(key[:30], alert=True)
if not key.startswith("http"):
link, raw = data["link"], data["raw"]
else:
link = key
raw = f"{key}/raw"
if ok.startswith("addons"):
key = "Addons"
elif ok.startswith("vcbot"):
key = "VCBot"
else:
key = "Official"
data = f"uh_{key}_"
if index is not None:
data += f"|{index}"
await e.edit(
"",
buttons=[
[Button.url("Lɪɴᴋ", link), Button.url("Rᴀᴡ", raw)],
[Button.inline("« Bᴀᴄᴋ", data=data)],
],
)
@callback(re.compile("cbs_(.*)"), owner=True)
async def _edit_to(event):
match = event.data_match.group(1).decode("utf-8")
data = _buttons.get(match)
if not data:
return
await event.edit(data["text"], buttons=data["buttons"], link_preview=False)
@callback(re.compile("abs_(.*)"), owner=True)
async def convo_handler(event: events.CallbackQuery):
match = event.data_match.group(1).decode("utf-8")
if not _convo.get(match):
return
await event.delete()
get_ = _convo[match]
back = get_["back"]
async with event.client.conversation(event.sender_id) as conv:
await conv.send_message(get_["text"])
response = await conv.get_response()
themssg = response.message
try:
themssg = ast.literal_eval(themssg)
except Exception:
pass
if themssg == "/cancel":
return await conv.send_message(
"Cancelled!!",
buttons=get_back_button(back),
)
await setit(event, get_["var"], themssg)
await conv.send_message(
f"{get_['name']} changed to `{themssg}`",
buttons=get_back_button(back),
)
@callback("authorise", owner=True)
async def _(e):
if not e.is_private:
return
url = GDrive._create_token_file()
await e.edit("Go to the below link and send the code!")
async with asst.conversation(e.sender_id) as conv:
await conv.send_message(url)
code = await conv.get_response()
if GDrive._create_token_file(code=code.text):
await conv.send_message(
"`Success!\nYou are all set to use Google Drive with Ultroid Userbot.`",
buttons=Button.inline("Main Menu", data="setter"),
)
else:
await conv.send_message("Wrong code! Click authorise again.")
@callback("folderid", owner=True, func=lambda x: x.is_private)
async def _(e):
if not e.is_private:
return
msg = (
"Send your FOLDER ID\n\n"
+ "For FOLDER ID:\n"
+ "1. Open Google Drive App.\n"
+ "2. Create Folder.\n"
+ "3. Make that folder public.\n"
+ "4. Send link of that folder."
)
await e.delete()
async with asst.conversation(e.sender_id, timeout=150) as conv:
await conv.send_message(msg)
repl = await conv.get_response()
id = repl.text
if id.startswith("https"):
id = id.split("?id=")[-1]
udB.set_key("GDRIVE_FOLDER_ID", id)
await repl.reply(
"`Success.`",
buttons=get_back_button("gdrive"),
)
@callback("gdrive", owner=True)
async def _(e):
if not e.is_private:
return
await e.edit(
"Click Authorise and send the code.\n\nYou can use your own CLIENT ID and SECRET by [this](https://t.me/UltroidUpdates/37)",
buttons=[
[
Button.inline("Folder ID", data="folderid"),
Button.inline("Authorise", data="authorise"),
],
[Button.inline("« Back", data="cbs_otvars")],
],
link_preview=False,
)
@callback("dmof", owner=True)
async def rhwhe(e):
if udB.get_key("DUAL_MODE"):
udB.del_key("DUAL_MODE")
key = "Off"
else:
udB.set_key("DUAL_MODE", "True")
key = "On"
Msg = f"Dual Mode : {key}"
await e.edit(Msg, buttons=get_back_button("cbs_otvars"))
@callback("dmhn", owner=True)
async def hndlrr(event):
await event.delete()
pru = event.sender_id
var = "DUAL_HNDLR"
name = "Dual Handler"
CH = udB.get_key(var) or "/"
async with event.client.conversation(pru) as conv:
await conv.send_message(
f"Send The Symbol Which u want as Handler/Trigger to use your Assistant bot\nUr Current Handler is [ `{CH}` ]\n\n use /cancel to cancel.",
)
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_otvars"),
)
elif len(themssg) > 1:
await conv.send_message(
"Incorrect Handler",
buttons=get_back_button("cbs_otvars"),
)
else:
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}",
buttons=get_back_button("cbs_otvars"),
)
@callback("emoj", owner=True)
async def emoji(event):
await event.delete()
pru = event.sender_id
var = "EMOJI_IN_HELP"
name = f"Emoji in `{HNDLR}help` menu"
async with event.client.conversation(pru) as conv:
await conv.send_message("Send emoji u want to set 🙃.\n\nUse /cancel to cancel.")
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_otvars"),
)
elif themssg.startswith(("/", HNDLR)):
await conv.send_message(
"Incorrect Emoji",
buttons=get_back_button("cbs_otvars"),
)
else:
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}\n",
buttons=get_back_button("cbs_otvars"),
)
@callback("plg", owner=True)
async def pluginch(event):
await event.delete()
pru = event.sender_id
var = "PLUGIN_CHANNEL"
name = "Plugin Channel"
async with event.client.conversation(pru) as conv:
await conv.send_message(
"Send id or username of a channel from where u want to install all plugins\n\nOur Channel~ @ultroidplugins\n\nUse /cancel to cancel.",
)
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_otvars"),
)
elif themssg.startswith(("/", HNDLR)):
await conv.send_message(
"Incorrect channel",
buttons=get_back_button("cbs_otvars"),
)
else:
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}\n After Setting All Things Do Restart",
buttons=get_back_button("cbs_otvars"),
)
@callback("hhndlr", owner=True)
async def hndlrr(event):
await event.delete()
pru = event.sender_id
var = "HNDLR"
name = "Handler/ Trigger"
async with event.client.conversation(pru) as conv:
await conv.send_message(
f"Send The Symbol Which u want as Handler/Trigger to use bot\nUr Current Handler is [ `{HNDLR}` ]\n\n use /cancel to cancel.",
)
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_otvars"),
)
elif len(themssg) > 1:
await conv.send_message(
"Incorrect Handler",
buttons=get_back_button("cbs_otvars"),
)
elif themssg.startswith(("/", "#", "@")):
await conv.send_message(
"This cannot be used as handler",
buttons=get_back_button("cbs_otvars"),
)
else:
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}",
buttons=get_back_button("cbs_otvars"),
)
@callback("shndlr", owner=True)
async def hndlrr(event):
await event.delete()
pru = event.sender_id
var = "SUDO_HNDLR"
name = "Sudo Handler"
async with event.client.conversation(pru) as conv:
await conv.send_message(
"Send The Symbol Which u want as Sudo Handler/Trigger to use bot\n\n use /cancel to cancel."
)
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_otvars"),
)
elif len(themssg) > 1:
await conv.send_message(
"Incorrect Handler",
buttons=get_back_button("cbs_otvars"),
)
elif themssg.startswith(("/", "#", "@")):
await conv.send_message(
"This cannot be used as handler",
buttons=get_back_button("cbs_otvars"),
)
else:
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}",
buttons=get_back_button("cbs_otvars"),
)
@callback("taglog", owner=True)
async def tagloggrr(e):
BUTTON = [
[Button.inline("SET TAG LOG", data="abs_settag")],
[Button.inline("DELETE TAG LOG", data="deltag")],
get_back_button("cbs_otvars"),
]
await e.edit(
"Choose Options",
buttons=BUTTON,
)
@callback("deltag", owner=True)
async def _(e):
udB.del_key("TAG_LOG")
await e.answer("Done!!! Tag Logger has been turned Off")
@callback("eaddon", owner=True)
async def pmset(event):
BT = (
[Button.inline("Aᴅᴅᴏɴs Oғғ", data="edof")]
if udB.get_key("ADDONS")
else [Button.inline("Aᴅᴅᴏɴs Oɴ", data="edon")]
)
await event.edit(
"ADDONS~ Extra Plugins:",
buttons=[
BT,
[Button.inline("« Bᴀᴄᴋ", data="cbs_otvars")],
],
)
@callback("edon", owner=True)
async def eddon(event):
var = "ADDONS"
await setit(event, var, "True")
await event.edit(
"Done! ADDONS has been turned on!!\n\n After Setting All Things Do Restart",
buttons=get_back_button("eaddon"),
)
@callback("edof", owner=True)
async def eddof(event):
udB.set_key("ADDONS", "False")
await event.edit(
"Done! ADDONS has been turned off!! After Setting All Things Do Restart",
buttons=get_back_button("eaddon"),
)
@callback("sudo", owner=True)
async def pmset(event):
BT = (
[Button.inline("Sᴜᴅᴏ Mᴏᴅᴇ Oғғ", data="ofsudo")]
if udB.get_key("SUDO")
else [Button.inline("Sᴜᴅᴏ Mᴏᴅᴇ Oɴ", data="onsudo")]
)
await event.edit(
f"SUDO MODE ~ Some peoples can use ur Bot which u selected. To know More use `{HNDLR}help sudo`",
buttons=[
BT,
[Button.inline("« Bᴀᴄᴋ", data="cbs_otvars")],
],
)
@callback("onsudo", owner=True)
async def eddon(event):
var = "SUDO"
await setit(event, var, "True")
await event.edit(
"Done! SUDO MODE has been turned on!!\n\n After Setting All Things Do Restart",
buttons=get_back_button("sudo"),
)
@callback("ofsudo", owner=True)
async def eddof(event):
var = "SUDO"
await setit(event, var, "False")
await event.edit(
"Done! SUDO MODE has been turned off!! After Setting All Things Do Restart",
buttons=get_back_button("sudo"),
)
@callback("sfgrp", owner=True)
async def sfgrp(event):
await event.delete()
name = "FBan Group ID"
var = "FBAN_GROUP_ID"
pru = event.sender_id
async with asst.conversation(pru) as conv:
await conv.send_message(
f"Make a group, add @MissRose_Bot, send `{HNDLR}id`, copy that and send it here.\nUse /cancel to go back.",
)
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
return await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_sfban"),
)
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}",
buttons=get_back_button("cbs_sfban"),
)
@callback("alvmed", owner=True)
async def media(event):
await event.delete()
pru = event.sender_id
var = "ALIVE_PIC"
name = "Alive Media"
async with event.client.conversation(pru) as conv:
await conv.send_message(
"**Alive Media**\nSend me a pic/gif/media to set as alive media.\n\nUse /cancel to terminate the operation.",
)
response = await conv.get_response()
try:
themssg = response.message
if themssg == "/cancel":
return await conv.send_message(
"Operation cancelled!!",
buttons=get_back_button("cbs_alvcstm"),
)
except BaseException as er:
LOGS.exception(er)
if (
not (response.text).startswith("/")
and response.text != ""
and (not response.media or isinstance(response.media, MessageMediaWebPage))
):
url = text_to_url(response)
elif response.sticker:
url = response.file.id
else:
media = await event.client.download_media(response, "alvpc")
try:
url = uploader.upload_file(media)
remove(media)
except BaseException as er:
LOGS.exception(er)
return await conv.send_message(
"Terminated.",
buttons=get_back_button("cbs_alvcstm"),
)
await setit(event, var, url)
await conv.send_message(
f"{name} has been set.",
buttons=get_back_button("cbs_alvcstm"),
)
@callback("delmed", owner=True)
async def dell(event):
try:
udB.del_key("ALIVE_PIC")
return await event.edit(
get_string("clst_5"), buttons=get_back_button("cbs_alabs_vcstm")
)
except BaseException as er:
LOGS.exception(er)
return await event.edit(
get_string("clst_4"),
buttons=get_back_button("cbs_alabs_vcstm"),
)
@callback("inpm_in", owner=True)
async def inl_on(event):
var = "INLINE_PM"
await setit(event, var, "True")
await event.edit(
"Done!! PMPermit type has been set to inline!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_pmtype")]],
)
@callback("inpm_no", owner=True)
async def inl_on(event):
var = "INLINE_PM"
await setit(event, var, "False")
await event.edit(
"Done!! PMPermit type has been set to normal!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_pmtype")]],
)
@callback("pmtxt", owner=True)
async def name(event):
await event.delete()
pru = event.sender_id
var = "PM_TEXT"
name = "PM Text"
async with event.client.conversation(pru) as conv:
await conv.send_message(
"**PM Text**\nEnter the new Pmpermit text.\n\nu can use `{name}` `{fullname}` `{count}` `{mention}` `{username}` to get this from user Too\n\nUse /cancel to terminate the operation.",
)
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
return await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_pmcstm"),
)
if len(themssg) > 4090:
return await conv.send_message(
"Message too long!\nGive a shorter message please!!",
buttons=get_back_button("cbs_pmcstm"),
)
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}\n\nAfter Setting All Things Do restart",
buttons=get_back_button("cbs_pmcstm"),
)
@callback("swarn", owner=True)
async def name(event):
m = range(1, 10)
tultd = [Button.inline(f"{x}", data=f"wrns_{x}") for x in m]
lst = list(zip(tultd[::3], tultd[1::3], tultd[2::3]))
lst.append([Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")])
await event.edit(
"Select the number of warnings for a user before getting blocked in PMs.",
buttons=lst,
)
@callback(re.compile(b"wrns_(.*)"), owner=True)
async def set_wrns(event):
value = int(event.data_match.group(1).decode("UTF-8"))
if dn := udB.set_key("PMWARNS", value):
await event.edit(
f"PM Warns Set to {value}.\nNew users will have {value} chances in PMs before getting banned.",
buttons=get_back_button("cbs_pmcstm"),
)
else:
await event.edit(
f"Something went wrong, please check your {HNDLR}logs!",
buttons=get_back_button("cbs_pmcstm"),
)
@callback("pmmed", owner=True)
async def media(event):
await event.delete()
pru = event.sender_id
var = "PMPIC"
name = "PM Media"
async with event.client.conversation(pru) as conv:
await conv.send_message(
"**PM Media**\nSend me a pic/gif/sticker/link to set as pmpermit media.\n\nUse /cancel to terminate the operation.",
)
response = await conv.get_response()
try:
themssg = response.message
if themssg == "/cancel":
return await conv.send_message(
"Operation cancelled!!",
buttons=get_back_button("cbs_pmcstm"),
)
except BaseException as er:
LOGS.exception(er)
media = await event.client.download_media(response, "pmpc")
if (
not (response.text).startswith("/")
and response.text != ""
and (not response.media or isinstance(response.media, MessageMediaWebPage))
):
url = text_to_url(response)
elif response.sticker:
url = response.file.id
else:
try:
url = uploader.upload_file(media)
remove(media)
except BaseException as er:
LOGS.exception(er)
return await conv.send_message(
"Terminated.",
buttons=get_back_button("cbs_pmcstm"),
)
await setit(event, var, url)
await conv.send_message(
f"{name} has been set.",
buttons=get_back_button("cbs_pmcstm"),
)
@callback("delpmmed", owner=True)
async def dell(event):
try:
udB.del_key("PMPIC")
return await event.edit(
get_string("clst_5"), buttons=get_back_button("cbs_pmcstm")
)
except BaseException as er:
LOGS.exception(er)
return await event.edit(
get_string("clst_4"),
buttons=[[Button.inline("« Sᴇᴛᴛɪɴɢs", data="setter")]],
)
@callback("apon", owner=True)
async def apon(event):
var = "AUTOAPPROVE"
await setit(event, var, "True")
await event.edit(
"Done!! AUTOAPPROVE Started!!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_apauto")]],
)
@callback("apof", owner=True)
async def apof(event):
try:
udB.set_key("AUTOAPPROVE", "False")
return await event.edit(
"Done! AUTOAPPROVE Stopped!!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_apauto")]],
)
except BaseException as er:
LOGS.exception(er)
return await event.edit(
get_string("clst_4"),
buttons=[[Button.inline("« Sᴇᴛᴛɪɴɢs", data="setter")]],
)
@callback("pml", owner=True)
async def l_vcs(event):
BT = (
[Button.inline("PMLOGGER OFF", data="pmlogof")]
if udB.get_key("PMLOG")
else [Button.inline("PMLOGGER ON", data="pmlog")]
)
await event.edit(
"PMLOGGER This Will Forward Ur Pm to Ur Private Group -",
buttons=[
BT,
[Button.inline("PᴍLᴏɢɢᴇʀ Gʀᴏᴜᴘ", "abs_pmlgg")],
[Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")],
],
)
@callback("pmlog", owner=True)
async def pmlog(event):
await setit(event, "PMLOG", "True")
await event.edit(
"Done!! PMLOGGER Started!!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="pml")]],
)
@callback("pmlogof", owner=True)
async def pmlogof(event):
try:
udB.del_key("PMLOG")
return await event.edit(
"Done! PMLOGGER Stopped!!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="pml")]],
)
except BaseException as er:
LOGS.exception(er)
return await event.edit(
get_string("clst_4"),
buttons=[[Button.inline("« Sᴇᴛᴛɪɴɢs", data="setter")]],
)
@callback("pmon", owner=True)
async def pmonn(event):
var = "PMSETTING"
await setit(event, var, "True")
await event.edit(
"Done! PMPermit has been turned on!!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_ppmset")]],
)
@callback("pmoff", owner=True)
async def pmofff(event):
var = "PMSETTING"
await setit(event, var, "False")
await event.edit(
"Done! PMPermit has been turned off!!",
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_ppmset")]],
)
@callback("botmew", owner=True)
async def hhh(e):
async with e.client.conversation(e.chat_id) as conv:
await conv.send_message("Send Any Media to keep at your Bot's welcome ")
msg = await conv.get_response()
if not msg.media or msg.text.startswith("/"):
return await conv.send_message(
"Terminated!", buttons=get_back_button("cbs_chatbot")
)
udB.set_key("STARTMEDIA", msg.file.id)
await conv.send_message("Done!", buttons=get_back_button("cbs_chatbot"))
@callback("botinfe", owner=True)
async def hhh(e):
async with e.client.conversation(e.chat_id) as conv:
await conv.send_message(
"Send message to set to Display, when user Press Info button in Bot Welcome!\n\nsend `False` to completely remove that button.."
)
msg = await conv.get_response()
if msg.media or msg.text.startswith("/"):
return await conv.send_message(
"Terminated!", buttons=get_back_button("cbs_chatbot")
)
udB.set_key("BOT_INFO_START", msg.text)
await conv.send_message("Done!", buttons=get_back_button("cbs_chatbot"))
@callback("pmfs", owner=True)
async def heheh(event):
Ll = []
err = ""
async with event.client.conversation(event.chat_id) as conv:
await conv.send_message(
"• Send The Chat Id(s), which you want user to Join Before using Chat/Pm Bot\n\n• Send /clear to disable PmBot Force sub..\n• • Send /cancel to stop this process.."
)
await conv.send_message(
"Example : \n`-1001234567\n-100778888`\n\nFor Multiple Chat(s)."
)
try:
msg = await conv.get_response()
except AsyncTimeOut:
return await conv.send_message("**• TimeUp!**\nStart from /start back.")
if not msg.text or msg.text.startswith("/"):
timyork = "Cancelled!"
if msg.text == "/clear":
udB.del_key("PMBOT_FSUB")
timyork = "Done! Force Subscribe Stopped\nRestart your Bot!"
return await conv.send_message(
"Cancelled!", buttons=get_back_button("cbs_chatbot")
)
for chat in msg.message.split("\n"):
if chat.startswith("-") or chat.isdigit():
chat = int(chat)
try:
CHSJSHS = await event.client.get_entity(chat)
Ll.append(get_peer_id(CHSJSHS))
except Exception as er:
err += f"**{chat}** : {er}\n"
if err:
return await conv.send_message(err)
udB.set_key("PMBOT_FSUB", str(Ll))
await conv.send_message(
"Done!\nRestart Your Bot.", buttons=get_back_button("cbs_chatbot")
)
@callback("bwel", owner=True)
async def name(event):
await event.delete()
pru = event.sender_id
var = "STARTMSG"
name = "Bot Welcome Message:"
async with event.client.conversation(pru) as conv:
await conv.send_message(
"**BOT WELCOME MSG**\nEnter the msg which u want to show when someone start your assistant Bot.\nYou Can use `{me}` , `{mention}` Parameters Too\nUse /cancel to terminate the operation.",
)
response = conv.wait_event(events.NewMessage(chats=pru))
response = await response
themssg = response.message.message
if themssg == "/cancel":
return await conv.send_message(
"Cancelled!!",
buttons=get_back_button("cbs_chatbot"),
)
await setit(event, var, themssg)
await conv.send_message(
f"{name} changed to {themssg}",
buttons=get_back_button("cbs_chatbot"),
)
@callback("onchbot", owner=True)
async def chon(event):
var = "PMBOT"
await setit(event, var, "True")
Loader(path="assistant/pmbot.py", key="PM Bot").load()
if AST_PLUGINS.get("pmbot"):
for i, e in AST_PLUGINS["pmbot"]:
event.client.remove_event_handler(i)
for i, e in AST_PLUGINS["pmbot"]:
event.client.add_event_handler(i, events.NewMessage(**e))
await event.edit(
"Done! Now u Can Chat With People Via This Bot",
buttons=[Button.inline("« Bᴀᴄᴋ", data="cbs_chatbot")],
)
@callback("ofchbot", owner=True)
async def chon(event):
var = "PMBOT"
await setit(event, var, "False")
if AST_PLUGINS.get("pmbot"):
for i, e in AST_PLUGINS["pmbot"]:
event.client.remove_event_handler(i)
await event.edit(
"Done! Chat People Via This Bot Stopped.",
buttons=[Button.inline("« Bᴀᴄᴋ", data="cbs_chatbot")],
)
@callback("inli_pic", owner=True)
async def media(event):
await event.delete()
pru = event.sender_id
var = "INLINE_PIC"
name = "Inline Media"
async with event.client.conversation(pru) as conv:
await conv.send_message(
"**Inline Media**\nSend me a pic/gif/ or link to set as inline media.\n\nUse /cancel to terminate the operation.",
)
response = await conv.get_response()
try:
themssg = response.message
if themssg == "/cancel":
return await conv.send_message(
"Operation cancelled!!",
buttons=get_back_button("setter"),
)
except BaseException as er:
LOGS.exception(er)
media = await event.client.download_media(response, "inlpic")
if (
not (response.text).startswith("/")
and response.text != ""
and (not response.media or isinstance(response.media, MessageMediaWebPage))
):
url = text_to_url(response)
else:
try:
url = uploader.upload_file(media)
remove(media)
except BaseException as er:
LOGS.exception(er)
return await conv.send_message(
"Terminated.",
buttons=get_back_button("setter"),
)
await setit(event, var, url)
await conv.send_message(
f"{name} has been set.",
buttons=get_back_button("setter"),
)
FD_MEDIA = {}
@callback(re.compile("fd(.*)"), owner=True)
async def fdroid_dler(event):
uri = event.data_match.group(1).decode("utf-8")
if FD_MEDIA.get(uri):
return await event.edit(file=FD_MEDIA[uri])
await event.answer("• Starting Download •", alert=True)
await event.edit("• Downloading.. •")
URL = f"https://f-droid.org/packages/{uri}"
conte = await async_searcher(URL, re_content=True)
BSC = bs(conte, "html.parser", from_encoding="utf-8")
dl_ = BSC.find("p", "package-version-download").find("a")["href"]
title = BSC.find("h3", "package-name").text.strip()
thumb = BSC.find("img", "package-icon")["src"]
if thumb.startswith("/"):
thumb = f"https://f-droid.org{thumb}"
thumb, _ = await fast_download(thumb, filename=f"{uri}.png")
s_time = time.time()
file, _ = await fast_download(
dl_,
filename=f"{title}.apk",
progress_callback=lambda d, t: asyncio.get_event_loop().create_task(
progress(
d,
t,
event,
s_time,
"Downloading...",
)
),
)
time.time()
n_file = await event.client.fast_uploader(
file, show_progress=True, event=event, message="Uploading...", to_delete=True
)
buttons = Button.switch_inline("Search Back", query="fdroid", same_peer=True)
try:
msg = await event.edit(
f"**• [{title}]({URL}) •**", file=n_file, thumb=thumb, buttons=buttons
)
except Exception as er:
LOGS.exception(er)
try:
msg = await event.client.edit_message(
await event.get_input_chat(),
event.message_id,
f"**• [{title}]({URL}) •**",
buttons=buttons,
thumb=thumb,
file=n_file,
)
except Exception as er:
os.remove(thumb)
LOGS.exception(er)
return await event.edit(f"**ERROR**: `{er}`", buttons=buttons)
if msg and hasattr(msg, "media"):
FD_MEDIA.update({uri: msg.media})
os.remove(thumb)