import time import requests import json import traceback from sys import version as pyver import os import shlex import textwrap from typing import Tuple import asyncio from gc import get_objects from datetime import datetime as dt from pyrogram import * from pyrogram.types import * from pyrogram import Client, filters from pyrogram import __version__ as pyrover from pyrogram.enums import ParseMode from pyrogram.types import ( InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputTextMessageContent, Message, ) from akn import CMD_HELP, StartTime, app, START_TIME from akn.Akeno.helper.data import Data from akn.Akeno.helper.inline import inline_wrapper, paginate_help from akn.Akeno.bot import * from pymongo import MongoClient from config import MONGO_URL from RyuzakiLib import __version__ as ryu client_mongo = MongoClient(MONGO_URL) db = client_mongo["tiktokbot"] collection = db["users"] def get_verify_id_card(user_id): prem_filter = collection.find_one({"user_id": user_id}) if prem_filter: return prem_filter.get("idcard") else: return None def get_expired_date(user_id): user = collection.find_one({"user_id": user_id}) if user: return user.get("expire_date") else: return None def get_via_inline_bot(user_id): get_inline = collection.find_one({"user_id": user_id}) ping_dc = get_inline["ping_dc"] if get_inline else None peer_users = get_inline["peer_users"] if get_inline else None peer_group = get_inline["peer_group"] if get_inline else None return ping_dc, peer_users, peer_group def get_install_peer(user_id): user_install = collection.find_one({"user_id": user_id}) if user_install: peer_users = user_install.get("peer_users_2") peer_group = user_install.get("peer_group_2") peer_channel = user_install.get("peer_channel_2") return peer_users, peer_group, peer_channel else: return None def get_test_button(user_id): token = collection.find_one({"user_id": user_id}) if token: tiktok_text = token.get("tiktok_text") tiktok_button = token.get("tiktok_button") return tiktok_text, tiktok_button else: return None async def get_readable_time(seconds: int) -> str: count = 0 up_time = "" time_list = [] time_suffix_list = ["s", "m", "Jam", "Hari"] while count < 4: count += 1 remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24) if seconds == 0 and remainder == 0: break time_list.append(int(result)) seconds = int(remainder) for x in range(len(time_list)): time_list[x] = str(time_list[x]) + time_suffix_list[x] if len(time_list) == 4: up_time += time_list.pop() + ", " time_list.reverse() up_time += ":".join(time_list) return up_time async def stats_function(message, answers): user_id = message._client.me.id users = 0 group = 0 async for dialog in message._client.get_dialogs(): if dialog.chat.type == enums.ChatType.PRIVATE: users += 1 elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP): group += 1 if message._client.me.id == 1191668125: status = "[DEV]" elif message._client.me.id == 901878554: status = "[ADMIN]" else: status = "[FREE USER]" start = dt.now() await message._client.invoke(Ping(ping_id=0)) ping = (dt.now() - start).microseconds / 1000 uptime = await get_readable_time((time.time() - StartTime)) msg = f""" Tiktok Ubot Status : {status} expired_on: unlimited full_name: {message._client.me.first_name} premium: {message._client.me.is_premium} dc_id: {message._client.me.dc_id} ping_dc: {ping} ms peer_users: {users} users peer_group: {group} group """ answers.append( InlineQueryResultArticle( title="Alivei", description="alive inline", thumb_url="https://telegra.ph/file/b4cdd0843ac94d0309ba7.jpg", input_message_content=InputTextMessageContent( msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True ), reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("Ping", callback_data="pingCB")]] ), ) ) return answers async def alive_function(message: Message, answers): uptime = await get_readable_time((time.time() - StartTime)) user_id = message._client.me.id users = 0 group = 0 async for dialog in message._client.get_dialogs(): if dialog.chat.type == enums.ChatType.PRIVATE: users += 1 elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP): group += 1 if message._client.me.id == 1191668125: status = "[DEV]" elif message._client.me.id == 901878554: status = "[ADMIN]" else: status = "[FREE USER]" start = dt.now() await message._client.invoke(Ping(ping_id=0)) ping = (dt.now() - start).microseconds / 1000 uptime = await get_readable_time((time.time() - StartTime)) msg = f""" Tiktok Ubot Status : {status} expired_on: unlimited full_name: {message._client.me.first_name} premium: {message._client.me.is_premium} dc_id: {message._client.me.dc_id} ping_dc: {ping} ms peer_users: {users} users peer_group: {group} group """ answers.append( InlineQueryResultArticle( title="Alive", description="Check Bot's Stats", thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg", input_message_content=InputTextMessageContent( msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True ), reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("Ping", callback_data="pingCB")]] ), ) ) return answers async def ping_function(query, answers): uptime = await get_readable_time((time.time() - StartTime)) msg = f""" • ᴜᴘᴛɪᴍᴇ: {str(dt.now() - START_TIME).split('.')[0]} """ answers.append( InlineQueryResultArticle( title="Ping", description="ping test", thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg", input_message_content=InputTextMessageContent( msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True ), reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("Ping", callback_data="pingCB")]] ), ) ) return answers async def help_function(answers): bttn = paginate_help(0, CMD_HELP, "helpme") answers.append( InlineQueryResultArticle( title="Help Article!", description="Check Command List & Help", thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg", input_message_content=InputTextMessageContent( Data.text_help_menu.format(len(CMD_HELP)), disable_web_page_preview=True ), reply_markup=InlineKeyboardMarkup(bttn), ) ) return answers @app.on_inline_query(filters.regex("bkp")) async def inline_query_handler(client: Client, message: Message, query): command_args = query.query.split()[1:] if len(command_args) != 1: return link = command_args[0] blacklist_url = "https://raw.githubusercontent.com/xtsea/pyKillerX/main/blacklist_channel.json" response = requests.get(blacklist_url) if response.status_code == 200: blacklist_str = response.text.strip() blacklist = json.loads(blacklist_str) else: print("Failed to fetch blacklist") return if any(link.startswith(prefix) for prefix in blacklist): results = [ InlineQueryResultArticle( title="Error", input_message_content=InputTextMessageContent( "Sorry, this command is not allowed in this channel/group." ), ) ] else: await client.copy_message(message.chat.id, from_chat_id=chat_id, message_id=message_id, caption=None) results = [] await client.answer_inline_query(query.id, results=results, cache_time=0) @app.on_callback_query(filters.regex("^closeuser_")) async def closeuser(bot: Client, cb: CallbackQuery): data = cb.data.split("_") user_id = int(data[1]) if cb.from_user.id == user_id: try: await bot.delete_messages(cb.message.chat.id, message_ids=cb.inline_message_id) except Exception as e: print(str(e)) else: await cb.answer("Not allowed user.", True) @app.on_inline_query(filters.regex("test")) @inline_wrapper async def hello_world(client: Client, query: InlineQuery): user_id = query.from_user.id peer_users, peer_group, peer_channel = get_install_peer(user_id) start = dt.now() ping = (dt.now() - start).microseconds / 1000 tiktok_text, tiktok_button = get_test_button(user_id) if tiktok_text and tiktok_button: reply_markup = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text=f"{tiktok_text}", url=f"{tiktok_button}", ) ] ] ) else: reply_markup = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="OWNER", user_id=user_id ) ] ] ) user = get_verify_id_card(user_id) if user: expired_on = "Unlimited" else: expired_on = get_expired_date(user_id) if expired_on is None: expired_on = "Unlimited" else: expired_on = expired_on.strftime("%d-%m-%Y") prem = await client.get_users(user_id) if prem.is_premium: msg = f""" Akeno Ubot Status : Ultra Diamond premium: {prem.is_premium} expired_on: {expired_on} dc_id: {prem.dc_id} ping_dc: {ping} peer_users: {peer_users} peer_group: {peer_group} peer_channel: {peer_channel} akeno_uptime: {str(dt.now() - dt.now()).split('.')[0]} ryuzaki_library: {ryu} """ else: msg = f""" Akeno Ubot Status : Non-Ultra premium: Non-Premium expired_on: {expired_on} dc_id: {prem.dc_id} peer_users: {peer_users} peer_group: {peer_group} peer_channel: {peer_channel} akeno_uptime: {str(dt.now() - dt.now()).split('.')[0]} ryuzaki_library: {ryu} """ photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg" results = [ InlineQueryResultPhoto( photo_url=photo_url, caption=msg, input_message_content=InputTextMessageContent( msg, parse_mode=ParseMode.HTML ), reply_markup=reply_markup ) ] await client.answer_inline_query(query.id, results=results, cache_time=0) @app.on_inline_query(filters.regex("status_tools")) @inline_wrapper async def hello_world_2(client: Client, query: InlineQuery): user_id = query.from_user.id data_user = await client.get_users(user_id) first_name = data_user.first_name username = data_user.username if data_user else None token = collection.find_one({"user_id": user_id}) prem_filter = token.get("premium_users") if token else None expired = get_expired_date(user_id) reply_markup = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="MENU", callback_data="check_back", ) ] ] ) if prem_filter: status = "" status += f"Status : Running\n" status += f"Expired_on : {expired}\n" status += f"First Name : {first_name}\n" status += f"Username : @{username}\n" status += f"UserID : {user_id}\n" photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg" results = [ InlineQueryResultPhoto( photo_url=photo_url, caption=status, input_message_content=InputTextMessageContent( status, parse_mode=ParseMode.HTML ), reply_markup=reply_markup ) ] await client.answer_inline_query(query.id, results=results, cache_time=0) @app.on_inline_query() @inline_wrapper async def inline_query_handler(client: Client, query): try: text = query.query.strip().lower() string_given = query.query.lower() answers = [] if text.strip() == "": return elif text.split()[0] == "taiok": answerss = await alive_function(query, answers) await client.answer_inline_query(query.id, results=answerss, cache_time=10) elif string_given.startswith("helper"): answers = await help_function(answers) await client.answer_inline_query(query.id, results=answers, cache_time=0) elif string_given.startswith("ping"): answers = await ping_function(query, answers) await client.answer_inline_query(query.id, results=answers, cache_time=10) elif string_given.startswith("stats"): m = [obj for obj in get_objects() if id(obj) == int(query.query.split(None, 1)[1])][0] answers = await stats_function(m, answers) await client.answer_inline_query(query.id, results=answers, cache_time=10) except Exception as e: e = traceback.format_exc() print(e, "InLine")