|
import time |
|
import requests |
|
import json |
|
import traceback |
|
from sys import version as pyver |
|
import os |
|
import shlex |
|
import textwrap |
|
from typing import Tuple |
|
import asyncio |
|
from gc import get_objects |
|
from datetime import datetime as dt |
|
|
|
from pyrogram import * |
|
from pyrogram.types import * |
|
from pyrogram import Client, filters |
|
from pyrogram import __version__ as pyrover |
|
from pyrogram.enums import ParseMode |
|
from pyrogram.types import ( |
|
InlineKeyboardButton, |
|
InlineKeyboardMarkup, |
|
InlineQueryResultArticle, |
|
InputTextMessageContent, |
|
Message, |
|
) |
|
|
|
from akn import CMD_HELP, StartTime, app, START_TIME |
|
from akn.Akeno.helper.data import Data |
|
from akn.Akeno.helper.inline import inline_wrapper, paginate_help |
|
from akn.Akeno.bot import * |
|
from pymongo import MongoClient |
|
from config import MONGO_URL |
|
from RyuzakiLib import __version__ as ryu |
|
|
|
client_mongo = MongoClient(MONGO_URL) |
|
|
|
db = client_mongo["tiktokbot"] |
|
collection = db["users"] |
|
|
|
def get_verify_id_card(user_id): |
|
prem_filter = collection.find_one({"user_id": user_id}) |
|
if prem_filter: |
|
return prem_filter.get("idcard") |
|
else: |
|
return None |
|
|
|
def get_expired_date(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("expire_date") |
|
else: |
|
return None |
|
|
|
def get_via_inline_bot(user_id): |
|
get_inline = collection.find_one({"user_id": user_id}) |
|
ping_dc = get_inline["ping_dc"] if get_inline else None |
|
peer_users = get_inline["peer_users"] if get_inline else None |
|
peer_group = get_inline["peer_group"] if get_inline else None |
|
return ping_dc, peer_users, peer_group |
|
|
|
def get_install_peer(user_id): |
|
user_install = collection.find_one({"user_id": user_id}) |
|
if user_install: |
|
peer_users = user_install.get("peer_users_2") |
|
peer_group = user_install.get("peer_group_2") |
|
peer_channel = user_install.get("peer_channel_2") |
|
return peer_users, peer_group, peer_channel |
|
else: |
|
return None |
|
|
|
def get_test_button(user_id): |
|
token = collection.find_one({"user_id": user_id}) |
|
if token: |
|
tiktok_text = token.get("tiktok_text") |
|
tiktok_button = token.get("tiktok_button") |
|
return tiktok_text, tiktok_button |
|
else: |
|
return None |
|
|
|
async def get_readable_time(seconds: int) -> str: |
|
count = 0 |
|
up_time = "" |
|
time_list = [] |
|
time_suffix_list = ["s", "m", "Jam", "Hari"] |
|
|
|
while count < 4: |
|
count += 1 |
|
remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24) |
|
if seconds == 0 and remainder == 0: |
|
break |
|
time_list.append(int(result)) |
|
seconds = int(remainder) |
|
|
|
for x in range(len(time_list)): |
|
time_list[x] = str(time_list[x]) + time_suffix_list[x] |
|
if len(time_list) == 4: |
|
up_time += time_list.pop() + ", " |
|
|
|
time_list.reverse() |
|
up_time += ":".join(time_list) |
|
|
|
return up_time |
|
|
|
async def stats_function(message, answers): |
|
user_id = message._client.me.id |
|
users = 0 |
|
group = 0 |
|
async for dialog in message._client.get_dialogs(): |
|
if dialog.chat.type == enums.ChatType.PRIVATE: |
|
users += 1 |
|
elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP): |
|
group += 1 |
|
if message._client.me.id == 1191668125: |
|
status = "[DEV]" |
|
elif message._client.me.id == 901878554: |
|
status = "[ADMIN]" |
|
else: |
|
status = "[FREE USER]" |
|
start = dt.now() |
|
await message._client.invoke(Ping(ping_id=0)) |
|
ping = (dt.now() - start).microseconds / 1000 |
|
uptime = await get_readable_time((time.time() - StartTime)) |
|
msg = f""" |
|
|
|
<b>Tiktok Ubot</b> |
|
<b>Status : {status}</b> |
|
<b>expired_on:</b> <code>unlimited</code> |
|
<b>full_name:</b> <code>{message._client.me.first_name}</code> |
|
<b>premium:</b> <code>{message._client.me.is_premium}</code> |
|
<b>dc_id:</b> <code>{message._client.me.dc_id}</code> |
|
<b>ping_dc:</b> <code>{ping} ms</code> |
|
<b>peer_users:</b> <code>{users} users</code> |
|
<b>peer_group:</b> <code>{group} group</code> |
|
|
|
""" |
|
answers.append( |
|
InlineQueryResultArticle( |
|
title="Alivei", |
|
description="alive inline", |
|
thumb_url="https://telegra.ph/file/b4cdd0843ac94d0309ba7.jpg", |
|
input_message_content=InputTextMessageContent( |
|
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True |
|
), |
|
reply_markup=InlineKeyboardMarkup( |
|
[[InlineKeyboardButton("Ping", callback_data="pingCB")]] |
|
), |
|
) |
|
) |
|
return answers |
|
|
|
|
|
async def alive_function(message: Message, answers): |
|
uptime = await get_readable_time((time.time() - StartTime)) |
|
user_id = message._client.me.id |
|
users = 0 |
|
group = 0 |
|
async for dialog in message._client.get_dialogs(): |
|
if dialog.chat.type == enums.ChatType.PRIVATE: |
|
users += 1 |
|
elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP): |
|
group += 1 |
|
if message._client.me.id == 1191668125: |
|
status = "[DEV]" |
|
elif message._client.me.id == 901878554: |
|
status = "[ADMIN]" |
|
else: |
|
status = "[FREE USER]" |
|
start = dt.now() |
|
await message._client.invoke(Ping(ping_id=0)) |
|
ping = (dt.now() - start).microseconds / 1000 |
|
uptime = await get_readable_time((time.time() - StartTime)) |
|
msg = f""" |
|
|
|
<b>Tiktok Ubot</b> |
|
<b>Status : {status}</b> |
|
<b>expired_on:</b> <code>unlimited</code> |
|
<b>full_name:</b> <code>{message._client.me.first_name}</code> |
|
<b>premium:</b> <code>{message._client.me.is_premium}</code> |
|
<b>dc_id:</b> <code>{message._client.me.dc_id}</code> |
|
<b>ping_dc:</b> <code>{ping} ms</code> |
|
<b>peer_users:</b> <code>{users} users</code> |
|
<b>peer_group:</b> <code>{group} group</code> |
|
""" |
|
|
|
answers.append( |
|
InlineQueryResultArticle( |
|
title="Alive", |
|
description="Check Bot's Stats", |
|
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg", |
|
input_message_content=InputTextMessageContent( |
|
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True |
|
), |
|
reply_markup=InlineKeyboardMarkup( |
|
[[InlineKeyboardButton("Ping", callback_data="pingCB")]] |
|
), |
|
) |
|
) |
|
return answers |
|
|
|
async def ping_function(query, answers): |
|
uptime = await get_readable_time((time.time() - StartTime)) |
|
msg = f""" |
|
• <b>ᴜᴘᴛɪᴍᴇ:</b> <code>{str(dt.now() - START_TIME).split('.')[0]}</code> |
|
""" |
|
answers.append( |
|
InlineQueryResultArticle( |
|
title="Ping", |
|
description="ping test", |
|
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg", |
|
input_message_content=InputTextMessageContent( |
|
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True |
|
), |
|
reply_markup=InlineKeyboardMarkup( |
|
[[InlineKeyboardButton("Ping", callback_data="pingCB")]] |
|
), |
|
) |
|
) |
|
return answers |
|
|
|
|
|
async def help_function(answers): |
|
bttn = paginate_help(0, CMD_HELP, "helpme") |
|
answers.append( |
|
InlineQueryResultArticle( |
|
title="Help Article!", |
|
description="Check Command List & Help", |
|
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg", |
|
input_message_content=InputTextMessageContent( |
|
Data.text_help_menu.format(len(CMD_HELP)), disable_web_page_preview=True |
|
), |
|
reply_markup=InlineKeyboardMarkup(bttn), |
|
) |
|
) |
|
return answers |
|
|
|
@app.on_inline_query(filters.regex("bkp")) |
|
async def inline_query_handler(client: Client, message: Message, query): |
|
command_args = query.query.split()[1:] |
|
if len(command_args) != 1: |
|
return |
|
|
|
link = command_args[0] |
|
blacklist_url = "https://raw.githubusercontent.com/xtsea/pyKillerX/main/blacklist_channel.json" |
|
response = requests.get(blacklist_url) |
|
if response.status_code == 200: |
|
blacklist_str = response.text.strip() |
|
blacklist = json.loads(blacklist_str) |
|
else: |
|
print("Failed to fetch blacklist") |
|
return |
|
|
|
if any(link.startswith(prefix) for prefix in blacklist): |
|
results = [ |
|
InlineQueryResultArticle( |
|
title="Error", |
|
input_message_content=InputTextMessageContent( |
|
"Sorry, this command is not allowed in this channel/group." |
|
), |
|
) |
|
] |
|
else: |
|
await client.copy_message(message.chat.id, from_chat_id=chat_id, message_id=message_id, caption=None) |
|
results = [] |
|
|
|
await client.answer_inline_query(query.id, results=results, cache_time=0) |
|
|
|
|
|
|
|
@app.on_callback_query(filters.regex("^closeuser_")) |
|
async def closeuser(bot: Client, cb: CallbackQuery): |
|
data = cb.data.split("_") |
|
user_id = int(data[1]) |
|
|
|
if cb.from_user.id == user_id: |
|
try: |
|
await bot.delete_messages(cb.message.chat.id, message_ids=cb.inline_message_id) |
|
except Exception as e: |
|
print(str(e)) |
|
else: |
|
await cb.answer("Not allowed user.", True) |
|
|
|
@app.on_inline_query(filters.regex("test")) |
|
@inline_wrapper |
|
async def hello_world(client: Client, query: InlineQuery): |
|
user_id = query.from_user.id |
|
peer_users, peer_group, peer_channel = get_install_peer(user_id) |
|
start = dt.now() |
|
ping = (dt.now() - start).microseconds / 1000 |
|
tiktok_text, tiktok_button = get_test_button(user_id) |
|
if tiktok_text and tiktok_button: |
|
reply_markup = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text=f"{tiktok_text}", |
|
url=f"{tiktok_button}", |
|
) |
|
] |
|
] |
|
) |
|
else: |
|
reply_markup = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text="OWNER", |
|
user_id=user_id |
|
) |
|
] |
|
] |
|
) |
|
user = get_verify_id_card(user_id) |
|
if user: |
|
expired_on = "Unlimited" |
|
else: |
|
expired_on = get_expired_date(user_id) |
|
if expired_on is None: |
|
expired_on = "Unlimited" |
|
else: |
|
expired_on = expired_on.strftime("%d-%m-%Y") |
|
prem = await client.get_users(user_id) |
|
if prem.is_premium: |
|
msg = f""" |
|
<b>Akeno Ubot</b> |
|
<b>Status :</b> <i>Ultra Diamond</i> |
|
<b>premium:</b> <code>{prem.is_premium}</code> |
|
<b>expired_on:</b> <code>{expired_on}</code> |
|
<b>dc_id:</b> <code>{prem.dc_id}</code> |
|
<b>ping_dc:</b> <code>{ping}</code> |
|
<b>peer_users:</b> <code>{peer_users}</code> |
|
<b>peer_group:</b> <code>{peer_group}</code> |
|
<b>peer_channel:</b> <code>{peer_channel}</code> |
|
<b>akeno_uptime:</b> <code>{str(dt.now() - dt.now()).split('.')[0]}</code> |
|
<b>ryuzaki_library:</b> <code>{ryu}</code> |
|
""" |
|
else: |
|
msg = f""" |
|
<b>Akeno Ubot</b> |
|
<b>Status :</b> <i>Non-Ultra</i> |
|
<b>premium:</b> <code>Non-Premium</code> |
|
<b>expired_on:</b> <code>{expired_on}</code> |
|
<b>dc_id:</b> <code>{prem.dc_id}</code> |
|
<b>peer_users:</b> <code>{peer_users}</code> |
|
<b>peer_group:</b> <code>{peer_group}</code> |
|
<b>peer_channel:</b> <code>{peer_channel}</code> |
|
<b>akeno_uptime:</b> <code>{str(dt.now() - dt.now()).split('.')[0]}</code> |
|
<b>ryuzaki_library:</b> <code>{ryu}</code> |
|
""" |
|
photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg" |
|
results = [ |
|
InlineQueryResultPhoto( |
|
photo_url=photo_url, |
|
caption=msg, |
|
input_message_content=InputTextMessageContent( |
|
msg, parse_mode=ParseMode.HTML |
|
), |
|
reply_markup=reply_markup |
|
) |
|
] |
|
await client.answer_inline_query(query.id, results=results, cache_time=0) |
|
|
|
|
|
@app.on_inline_query(filters.regex("status_tools")) |
|
@inline_wrapper |
|
async def hello_world_2(client: Client, query: InlineQuery): |
|
user_id = query.from_user.id |
|
data_user = await client.get_users(user_id) |
|
first_name = data_user.first_name |
|
username = data_user.username if data_user else None |
|
token = collection.find_one({"user_id": user_id}) |
|
prem_filter = token.get("premium_users") if token else None |
|
expired = get_expired_date(user_id) |
|
reply_markup = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text="MENU", |
|
callback_data="check_back", |
|
) |
|
] |
|
] |
|
) |
|
if prem_filter: |
|
status = "" |
|
status += f"<b>Status</b> : <code>Running</code>\n" |
|
status += f"<b>Expired_on</b> : <code>{expired}</code>\n" |
|
status += f"<b>First Name</b> : <code>{first_name}</code>\n" |
|
status += f"<b>Username</b> : <code>@{username}</code>\n" |
|
status += f"<b>UserID</b> : <code>{user_id}</code>\n" |
|
photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg" |
|
results = [ |
|
InlineQueryResultPhoto( |
|
photo_url=photo_url, |
|
caption=status, |
|
input_message_content=InputTextMessageContent( |
|
status, parse_mode=ParseMode.HTML |
|
), |
|
reply_markup=reply_markup |
|
) |
|
] |
|
await client.answer_inline_query(query.id, results=results, cache_time=0) |
|
|
|
@app.on_inline_query() |
|
@inline_wrapper |
|
async def inline_query_handler(client: Client, query): |
|
try: |
|
text = query.query.strip().lower() |
|
string_given = query.query.lower() |
|
answers = [] |
|
if text.strip() == "": |
|
return |
|
elif text.split()[0] == "taiok": |
|
answerss = await alive_function(query, answers) |
|
await client.answer_inline_query(query.id, results=answerss, cache_time=10) |
|
elif string_given.startswith("helper"): |
|
answers = await help_function(answers) |
|
await client.answer_inline_query(query.id, results=answers, cache_time=0) |
|
elif string_given.startswith("ping"): |
|
answers = await ping_function(query, answers) |
|
await client.answer_inline_query(query.id, results=answers, cache_time=10) |
|
elif string_given.startswith("stats"): |
|
m = [obj for obj in get_objects() if id(obj) == int(query.query.split(None, 1)[1])][0] |
|
answers = await stats_function(m, answers) |
|
await client.answer_inline_query(query.id, results=answers, cache_time=10) |
|
except Exception as e: |
|
e = traceback.format_exc() |
|
print(e, "InLine") |
|
|