import time
import requests
import json
import traceback
from sys import version as pyver
import os
import shlex
import textwrap
from typing import Tuple
import asyncio
from gc import get_objects
from datetime import datetime as dt
from pyrogram import *
from pyrogram.types import *
from pyrogram import Client, filters
from pyrogram import __version__ as pyrover
from pyrogram.enums import ParseMode
from pyrogram.types import (
InlineKeyboardButton,
InlineKeyboardMarkup,
InlineQueryResultArticle,
InputTextMessageContent,
Message,
)
from akn import CMD_HELP, StartTime, app, START_TIME
from akn.Akeno.helper.data import Data
from akn.Akeno.helper.inline import inline_wrapper, paginate_help
from akn.Akeno.bot import *
from pymongo import MongoClient
from config import MONGO_URL
from RyuzakiLib import __version__ as ryu
client_mongo = MongoClient(MONGO_URL)
db = client_mongo["tiktokbot"]
collection = db["users"]
def get_verify_id_card(user_id):
prem_filter = collection.find_one({"user_id": user_id})
if prem_filter:
return prem_filter.get("idcard")
else:
return None
def get_expired_date(user_id):
user = collection.find_one({"user_id": user_id})
if user:
return user.get("expire_date")
else:
return None
def get_via_inline_bot(user_id):
get_inline = collection.find_one({"user_id": user_id})
ping_dc = get_inline["ping_dc"] if get_inline else None
peer_users = get_inline["peer_users"] if get_inline else None
peer_group = get_inline["peer_group"] if get_inline else None
return ping_dc, peer_users, peer_group
def get_install_peer(user_id):
user_install = collection.find_one({"user_id": user_id})
if user_install:
peer_users = user_install.get("peer_users_2")
peer_group = user_install.get("peer_group_2")
peer_channel = user_install.get("peer_channel_2")
return peer_users, peer_group, peer_channel
else:
return None
def get_test_button(user_id):
token = collection.find_one({"user_id": user_id})
if token:
tiktok_text = token.get("tiktok_text")
tiktok_button = token.get("tiktok_button")
return tiktok_text, tiktok_button
else:
return None
async def get_readable_time(seconds: int) -> str:
count = 0
up_time = ""
time_list = []
time_suffix_list = ["s", "m", "Jam", "Hari"]
while count < 4:
count += 1
remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24)
if seconds == 0 and remainder == 0:
break
time_list.append(int(result))
seconds = int(remainder)
for x in range(len(time_list)):
time_list[x] = str(time_list[x]) + time_suffix_list[x]
if len(time_list) == 4:
up_time += time_list.pop() + ", "
time_list.reverse()
up_time += ":".join(time_list)
return up_time
async def stats_function(message, answers):
user_id = message._client.me.id
users = 0
group = 0
async for dialog in message._client.get_dialogs():
if dialog.chat.type == enums.ChatType.PRIVATE:
users += 1
elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP):
group += 1
if message._client.me.id == 1191668125:
status = "[DEV]"
elif message._client.me.id == 901878554:
status = "[ADMIN]"
else:
status = "[FREE USER]"
start = dt.now()
await message._client.invoke(Ping(ping_id=0))
ping = (dt.now() - start).microseconds / 1000
uptime = await get_readable_time((time.time() - StartTime))
msg = f"""
Tiktok Ubot
Status : {status}
expired_on: unlimited
full_name: {message._client.me.first_name}
premium: {message._client.me.is_premium}
dc_id: {message._client.me.dc_id}
ping_dc: {ping} ms
peer_users: {users} users
peer_group: {group} group
"""
answers.append(
InlineQueryResultArticle(
title="Alivei",
description="alive inline",
thumb_url="https://telegra.ph/file/b4cdd0843ac94d0309ba7.jpg",
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("Ping", callback_data="pingCB")]]
),
)
)
return answers
async def alive_function(message: Message, answers):
uptime = await get_readable_time((time.time() - StartTime))
user_id = message._client.me.id
users = 0
group = 0
async for dialog in message._client.get_dialogs():
if dialog.chat.type == enums.ChatType.PRIVATE:
users += 1
elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP):
group += 1
if message._client.me.id == 1191668125:
status = "[DEV]"
elif message._client.me.id == 901878554:
status = "[ADMIN]"
else:
status = "[FREE USER]"
start = dt.now()
await message._client.invoke(Ping(ping_id=0))
ping = (dt.now() - start).microseconds / 1000
uptime = await get_readable_time((time.time() - StartTime))
msg = f"""
Tiktok Ubot
Status : {status}
expired_on: unlimited
full_name: {message._client.me.first_name}
premium: {message._client.me.is_premium}
dc_id: {message._client.me.dc_id}
ping_dc: {ping} ms
peer_users: {users} users
peer_group: {group} group
"""
answers.append(
InlineQueryResultArticle(
title="Alive",
description="Check Bot's Stats",
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("Ping", callback_data="pingCB")]]
),
)
)
return answers
async def ping_function(query, answers):
uptime = await get_readable_time((time.time() - StartTime))
msg = f"""
• ᴜᴘᴛɪᴍᴇ: {str(dt.now() - START_TIME).split('.')[0]}
"""
answers.append(
InlineQueryResultArticle(
title="Ping",
description="ping test",
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("Ping", callback_data="pingCB")]]
),
)
)
return answers
async def help_function(answers):
bttn = paginate_help(0, CMD_HELP, "helpme")
answers.append(
InlineQueryResultArticle(
title="Help Article!",
description="Check Command List & Help",
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
input_message_content=InputTextMessageContent(
Data.text_help_menu.format(len(CMD_HELP)), disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(bttn),
)
)
return answers
@app.on_inline_query(filters.regex("bkp"))
async def inline_query_handler(client: Client, message: Message, query):
command_args = query.query.split()[1:]
if len(command_args) != 1:
return
link = command_args[0]
blacklist_url = "https://raw.githubusercontent.com/xtsea/pyKillerX/main/blacklist_channel.json"
response = requests.get(blacklist_url)
if response.status_code == 200:
blacklist_str = response.text.strip()
blacklist = json.loads(blacklist_str)
else:
print("Failed to fetch blacklist")
return
if any(link.startswith(prefix) for prefix in blacklist):
results = [
InlineQueryResultArticle(
title="Error",
input_message_content=InputTextMessageContent(
"Sorry, this command is not allowed in this channel/group."
),
)
]
else:
await client.copy_message(message.chat.id, from_chat_id=chat_id, message_id=message_id, caption=None)
results = []
await client.answer_inline_query(query.id, results=results, cache_time=0)
@app.on_callback_query(filters.regex("^closeuser_"))
async def closeuser(bot: Client, cb: CallbackQuery):
data = cb.data.split("_")
user_id = int(data[1])
if cb.from_user.id == user_id:
try:
await bot.delete_messages(cb.message.chat.id, message_ids=cb.inline_message_id)
except Exception as e:
print(str(e))
else:
await cb.answer("Not allowed user.", True)
@app.on_inline_query(filters.regex("test"))
@inline_wrapper
async def hello_world(client: Client, query: InlineQuery):
user_id = query.from_user.id
peer_users, peer_group, peer_channel = get_install_peer(user_id)
start = dt.now()
ping = (dt.now() - start).microseconds / 1000
tiktok_text, tiktok_button = get_test_button(user_id)
if tiktok_text and tiktok_button:
reply_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text=f"{tiktok_text}",
url=f"{tiktok_button}",
)
]
]
)
else:
reply_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="OWNER",
user_id=user_id
)
]
]
)
user = get_verify_id_card(user_id)
if user:
expired_on = "Unlimited"
else:
expired_on = get_expired_date(user_id)
if expired_on is None:
expired_on = "Unlimited"
else:
expired_on = expired_on.strftime("%d-%m-%Y")
prem = await client.get_users(user_id)
if prem.is_premium:
msg = f"""
Akeno Ubot
Status : Ultra Diamond
premium: {prem.is_premium}
expired_on: {expired_on}
dc_id: {prem.dc_id}
ping_dc: {ping}
peer_users: {peer_users}
peer_group: {peer_group}
peer_channel: {peer_channel}
akeno_uptime: {str(dt.now() - dt.now()).split('.')[0]}
ryuzaki_library: {ryu}
"""
else:
msg = f"""
Akeno Ubot
Status : Non-Ultra
premium: Non-Premium
expired_on: {expired_on}
dc_id: {prem.dc_id}
peer_users: {peer_users}
peer_group: {peer_group}
peer_channel: {peer_channel}
akeno_uptime: {str(dt.now() - dt.now()).split('.')[0]}
ryuzaki_library: {ryu}
"""
photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg"
results = [
InlineQueryResultPhoto(
photo_url=photo_url,
caption=msg,
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML
),
reply_markup=reply_markup
)
]
await client.answer_inline_query(query.id, results=results, cache_time=0)
@app.on_inline_query(filters.regex("status_tools"))
@inline_wrapper
async def hello_world_2(client: Client, query: InlineQuery):
user_id = query.from_user.id
data_user = await client.get_users(user_id)
first_name = data_user.first_name
username = data_user.username if data_user else None
token = collection.find_one({"user_id": user_id})
prem_filter = token.get("premium_users") if token else None
expired = get_expired_date(user_id)
reply_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="MENU",
callback_data="check_back",
)
]
]
)
if prem_filter:
status = ""
status += f"Status : Running
\n"
status += f"Expired_on : {expired}
\n"
status += f"First Name : {first_name}
\n"
status += f"Username : @{username}
\n"
status += f"UserID : {user_id}
\n"
photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg"
results = [
InlineQueryResultPhoto(
photo_url=photo_url,
caption=status,
input_message_content=InputTextMessageContent(
status, parse_mode=ParseMode.HTML
),
reply_markup=reply_markup
)
]
await client.answer_inline_query(query.id, results=results, cache_time=0)
@app.on_inline_query()
@inline_wrapper
async def inline_query_handler(client: Client, query):
try:
text = query.query.strip().lower()
string_given = query.query.lower()
answers = []
if text.strip() == "":
return
elif text.split()[0] == "taiok":
answerss = await alive_function(query, answers)
await client.answer_inline_query(query.id, results=answerss, cache_time=10)
elif string_given.startswith("helper"):
answers = await help_function(answers)
await client.answer_inline_query(query.id, results=answers, cache_time=0)
elif string_given.startswith("ping"):
answers = await ping_function(query, answers)
await client.answer_inline_query(query.id, results=answers, cache_time=10)
elif string_given.startswith("stats"):
m = [obj for obj in get_objects() if id(obj) == int(query.query.split(None, 1)[1])][0]
answers = await stats_function(m, answers)
await client.answer_inline_query(query.id, results=answers, cache_time=10)
except Exception as e:
e = traceback.format_exc()
print(e, "InLine")