|
import traceback |
|
import time |
|
import aiohttp |
|
import platform |
|
import asyncio |
|
import os |
|
|
|
from pyrogram import filters |
|
from pyrogram.errors import MessageDeleteForbidden |
|
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup |
|
from pyrogram.types import * |
|
from pyrogram import Client |
|
|
|
from pyrogram import * |
|
from akn import CMD_HELP, app |
|
from akn.Akeno.helper.data import Data |
|
from akn.Akeno.helper.inline import paginate_help, cb_wrapper |
|
from akn import ids as users |
|
from config import * |
|
import requests |
|
from datetime import datetime as dt |
|
from platform import python_version |
|
from datetime import datetime as dt, timedelta |
|
from pytz import timezone |
|
|
|
from pyrogram import __version__ as pyro |
|
from pyrogram.types import Message |
|
from telethon import __version__ as tele |
|
from psutil import cpu_percent, virtual_memory, disk_usage, boot_time |
|
from pymongo import MongoClient |
|
|
|
client_mongo = MongoClient(MONGO_URL) |
|
|
|
db = client_mongo["tiktokbot"] |
|
collection = db["users"] |
|
|
|
def get_expired_date(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("expire_date") |
|
else: |
|
return None |
|
|
|
def get_ch_help(user_id): |
|
token = collection.find_one({"user_id": user_id}) |
|
if token: |
|
return token |
|
else: |
|
return None |
|
|
|
def get_alive_logo(user_id): |
|
token = collection.find_one({"user_id": user_id}) |
|
if token: |
|
getvar = token.get("alive_logo") |
|
return getvar |
|
else: |
|
return None |
|
|
|
def get_asupan_database(user_id): |
|
token = collection.find_one({"user_id": user_id}) |
|
if token: |
|
get_supan = token.get("asupan") if token else None |
|
return get_supan |
|
else: |
|
return None |
|
|
|
def get_api_rmbg(user_id): |
|
token = collection.find_one({"user_id": user_id}) |
|
if token: |
|
get_rmbg = token.get("api_rmbg") if token else None |
|
return get_rmbg |
|
else: |
|
return None |
|
|
|
def get_install_peer(user_id): |
|
user_install = collection.find_one({"user_id": user_id}) |
|
if user_install: |
|
peer_users = user_install.get("peer_users_2") |
|
peer_group = user_install.get("peer_group_2") |
|
return peer_users, peer_group |
|
else: |
|
return None |
|
|
|
@Client.on_callback_query() |
|
async def _callbacks(_, callback_query: CallbackQuery): |
|
query = callback_query.data.lower() |
|
bot_me = await app.get_me() |
|
if query == "helper": |
|
buttons = paginate_help(0, CMD_HELP, "helpme") |
|
await app.edit_inline_text( |
|
callback_query.inline_message_id, |
|
Data.text_help_menu, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup(buttons), |
|
) |
|
elif query == "close": |
|
await app.edit_inline_text(callback_query.inline_message_id, "**β CLOSED**") |
|
return |
|
elif query == "close_help": |
|
if callback_query.from_user.id not in users: |
|
return |
|
await app.edit_inline_text( |
|
callback_query.inline_message_id, |
|
"**β CLOSED MENU HELP**", |
|
reply_markup=InlineKeyboardMarkup(Data.reopen), |
|
) |
|
return |
|
elif query == "closed": |
|
try: |
|
await callback_query.message.delete() |
|
except BaseException: |
|
pass |
|
try: |
|
await callback_query.message.reply_to_message.delete() |
|
except BaseException: |
|
pass |
|
elif query == "make_basic_button": |
|
try: |
|
bttn = paginate_help(0, CMD_HELP, "helpme") |
|
await app.edit_inline_text( |
|
callback_query.inline_message_id, |
|
Data.text_help_menu, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup(bttn), |
|
) |
|
except Exception as e: |
|
e = traceback.format_exc() |
|
print(e, "Callbacks") |
|
|
|
@app.on_callback_query(filters.regex("^pingCB")) |
|
async def pingCallback(_, cb: CallbackQuery): |
|
start_time = time.time() |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get("https://api.telegram.org") as response: |
|
if response.status != 200: |
|
pass |
|
end_time = time.time() |
|
ping_time = round((end_time - start_time) * 1000, 3) |
|
await cb.answer(f'Pong! {ping_time}ms') |
|
|
|
@app.on_callback_query(filters.regex("ub_modul_(.*)")) |
|
@cb_wrapper |
|
async def on_plug_in_cb(_, callback_query: CallbackQuery): |
|
modul_name = callback_query.matches[0].group(1) |
|
user_id = callback_query.from_user.id |
|
username = "Β©οΈ Copyright 2019-2024" |
|
commands: dict = CMD_HELP[modul_name] |
|
this_command = f"ββγ **Help For {str(modul_name).upper()}** γββ\n\n" |
|
for x in commands: |
|
this_command += f" β’ **Command:** `.{str(x)}`\n β’ **Function:** `{str(commands[x])}`\n\n" |
|
this_command += "{}".format(username) |
|
bttn = [ |
|
[InlineKeyboardButton(text="Return", callback_data="reopen")], |
|
] |
|
reply_pop_up_alert = ( |
|
this_command |
|
if this_command is not None |
|
else f"{module_name} No documentation has been written for the module." |
|
) |
|
await app.edit_inline_text( |
|
callback_query.inline_message_id, |
|
reply_pop_up_alert, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup(bttn), |
|
) |
|
|
|
|
|
@app.on_callback_query(filters.regex("reopen")) |
|
@cb_wrapper |
|
async def reopen_in_cb(_, callback_query: CallbackQuery): |
|
buttons = paginate_help(0, CMD_HELP, "helpme") |
|
await app.edit_inline_text( |
|
callback_query.inline_message_id, |
|
Data.text_help_menu, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup(buttons), |
|
) |
|
|
|
|
|
@app.on_callback_query(filters.regex("helpme_prev\((.+?)\)")) |
|
@cb_wrapper |
|
async def on_plug_prev_in_cb(_, callback_query: CallbackQuery): |
|
current_page_number = int(callback_query.matches[0].group(1)) |
|
buttons = paginate_help(current_page_number - 1, CMD_HELP, "helpme") |
|
await app.edit_inline_text( |
|
callback_query.inline_message_id, |
|
Data.text_help_menu, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup(buttons), |
|
) |
|
|
|
|
|
@app.on_callback_query(filters.regex("helpme_next\((.+?)\)")) |
|
@cb_wrapper |
|
async def on_plug_next_in_cb(_, callback_query: CallbackQuery): |
|
current_page_number = int(callback_query.matches[0].group(1)) |
|
buttons = paginate_help(current_page_number + 1, CMD_HELP, "helpme") |
|
await app.edit_inline_text( |
|
callback_query.inline_message_id, |
|
Data.text_help_menu, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup(buttons), |
|
) |
|
|