randydev's picture
fix revert back and update
21bc372
import traceback
import time
import aiohttp
import platform
import asyncio
import os
from pyrogram import filters
from pyrogram.errors import MessageDeleteForbidden
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
from pyrogram.types import *
from pyrogram import Client
from pyrogram import *
from akn import CMD_HELP, app
from akn.Akeno.helper.data import Data
from akn.Akeno.helper.inline import paginate_help, cb_wrapper
from akn import ids as users
from config import *
import requests
from datetime import datetime as dt
from platform import python_version
from datetime import datetime as dt, timedelta
from pytz import timezone
from pyrogram import __version__ as pyro
from pyrogram.types import Message
from telethon import __version__ as tele
from psutil import cpu_percent, virtual_memory, disk_usage, boot_time
from pymongo import MongoClient
client_mongo = MongoClient(MONGO_URL)
db = client_mongo["tiktokbot"]
collection = db["users"]
def get_expired_date(user_id):
user = collection.find_one({"user_id": user_id})
if user:
return user.get("expire_date")
else:
return None
def get_ch_help(user_id):
token = collection.find_one({"user_id": user_id})
if token:
return token
else:
return None
def get_alive_logo(user_id):
token = collection.find_one({"user_id": user_id})
if token:
getvar = token.get("alive_logo")
return getvar
else:
return None
def get_asupan_database(user_id):
token = collection.find_one({"user_id": user_id})
if token:
get_supan = token.get("asupan") if token else None
return get_supan
else:
return None
def get_api_rmbg(user_id):
token = collection.find_one({"user_id": user_id})
if token:
get_rmbg = token.get("api_rmbg") if token else None
return get_rmbg
else:
return None
def get_install_peer(user_id):
user_install = collection.find_one({"user_id": user_id})
if user_install:
peer_users = user_install.get("peer_users_2")
peer_group = user_install.get("peer_group_2")
return peer_users, peer_group
else:
return None
@Client.on_callback_query()
async def _callbacks(_, callback_query: CallbackQuery):
query = callback_query.data.lower()
bot_me = await app.get_me()
if query == "helper":
buttons = paginate_help(0, CMD_HELP, "helpme")
await app.edit_inline_text(
callback_query.inline_message_id,
Data.text_help_menu,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(buttons),
)
elif query == "close":
await app.edit_inline_text(callback_query.inline_message_id, "**β€” CLOSED**")
return
elif query == "close_help":
if callback_query.from_user.id not in users:
return
await app.edit_inline_text(
callback_query.inline_message_id,
"**β€” CLOSED MENU HELP**",
reply_markup=InlineKeyboardMarkup(Data.reopen),
)
return
elif query == "closed":
try:
await callback_query.message.delete()
except BaseException:
pass
try:
await callback_query.message.reply_to_message.delete()
except BaseException:
pass
elif query == "make_basic_button":
try:
bttn = paginate_help(0, CMD_HELP, "helpme")
await app.edit_inline_text(
callback_query.inline_message_id,
Data.text_help_menu,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(bttn),
)
except Exception as e:
e = traceback.format_exc()
print(e, "Callbacks")
@app.on_callback_query(filters.regex("^pingCB"))
async def pingCallback(_, cb: CallbackQuery):
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get("https://api.telegram.org") as response:
if response.status != 200:
pass
end_time = time.time()
ping_time = round((end_time - start_time) * 1000, 3)
await cb.answer(f'Pong! {ping_time}ms')
@app.on_callback_query(filters.regex("ub_modul_(.*)"))
@cb_wrapper
async def on_plug_in_cb(_, callback_query: CallbackQuery):
modul_name = callback_query.matches[0].group(1)
user_id = callback_query.from_user.id
username = "©️ Copyright 2019-2024"
commands: dict = CMD_HELP[modul_name]
this_command = f"β”€β”€γ€Œ **Help For {str(modul_name).upper()}** 」──\n\n"
for x in commands:
this_command += f" β€’ **Command:** `.{str(x)}`\n β€’ **Function:** `{str(commands[x])}`\n\n"
this_command += "{}".format(username)
bttn = [
[InlineKeyboardButton(text="Return", callback_data="reopen")],
]
reply_pop_up_alert = (
this_command
if this_command is not None
else f"{module_name} No documentation has been written for the module."
)
await app.edit_inline_text(
callback_query.inline_message_id,
reply_pop_up_alert,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(bttn),
)
@app.on_callback_query(filters.regex("reopen"))
@cb_wrapper
async def reopen_in_cb(_, callback_query: CallbackQuery):
buttons = paginate_help(0, CMD_HELP, "helpme")
await app.edit_inline_text(
callback_query.inline_message_id,
Data.text_help_menu,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(buttons),
)
@app.on_callback_query(filters.regex("helpme_prev\((.+?)\)"))
@cb_wrapper
async def on_plug_prev_in_cb(_, callback_query: CallbackQuery):
current_page_number = int(callback_query.matches[0].group(1))
buttons = paginate_help(current_page_number - 1, CMD_HELP, "helpme")
await app.edit_inline_text(
callback_query.inline_message_id,
Data.text_help_menu,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(buttons),
)
@app.on_callback_query(filters.regex("helpme_next\((.+?)\)"))
@cb_wrapper
async def on_plug_next_in_cb(_, callback_query: CallbackQuery):
current_page_number = int(callback_query.matches[0].group(1))
buttons = paginate_help(current_page_number + 1, CMD_HELP, "helpme")
await app.edit_inline_text(
callback_query.inline_message_id,
Data.text_help_menu,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(buttons),
)