akn-dev / akn /Akeno /bot /inline.py
randydev's picture
fix revert back and update
21bc372
import time
import requests
import json
import traceback
from sys import version as pyver
import os
import shlex
import textwrap
from typing import Tuple
import asyncio
from gc import get_objects
from datetime import datetime as dt
from pyrogram import *
from pyrogram.types import *
from pyrogram import Client, filters
from pyrogram import __version__ as pyrover
from pyrogram.enums import ParseMode
from pyrogram.types import (
InlineKeyboardButton,
InlineKeyboardMarkup,
InlineQueryResultArticle,
InputTextMessageContent,
Message,
)
from akn import CMD_HELP, StartTime, app, START_TIME
from akn.Akeno.helper.data import Data
from akn.Akeno.helper.inline import inline_wrapper, paginate_help
from akn.Akeno.bot import *
from pymongo import MongoClient
from config import MONGO_URL
from RyuzakiLib import __version__ as ryu
client_mongo = MongoClient(MONGO_URL)
db = client_mongo["tiktokbot"]
collection = db["users"]
def get_verify_id_card(user_id):
prem_filter = collection.find_one({"user_id": user_id})
if prem_filter:
return prem_filter.get("idcard")
else:
return None
def get_expired_date(user_id):
user = collection.find_one({"user_id": user_id})
if user:
return user.get("expire_date")
else:
return None
def get_via_inline_bot(user_id):
get_inline = collection.find_one({"user_id": user_id})
ping_dc = get_inline["ping_dc"] if get_inline else None
peer_users = get_inline["peer_users"] if get_inline else None
peer_group = get_inline["peer_group"] if get_inline else None
return ping_dc, peer_users, peer_group
def get_install_peer(user_id):
user_install = collection.find_one({"user_id": user_id})
if user_install:
peer_users = user_install.get("peer_users_2")
peer_group = user_install.get("peer_group_2")
peer_channel = user_install.get("peer_channel_2")
return peer_users, peer_group, peer_channel
else:
return None
def get_test_button(user_id):
token = collection.find_one({"user_id": user_id})
if token:
tiktok_text = token.get("tiktok_text")
tiktok_button = token.get("tiktok_button")
return tiktok_text, tiktok_button
else:
return None
async def get_readable_time(seconds: int) -> str:
count = 0
up_time = ""
time_list = []
time_suffix_list = ["s", "m", "Jam", "Hari"]
while count < 4:
count += 1
remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24)
if seconds == 0 and remainder == 0:
break
time_list.append(int(result))
seconds = int(remainder)
for x in range(len(time_list)):
time_list[x] = str(time_list[x]) + time_suffix_list[x]
if len(time_list) == 4:
up_time += time_list.pop() + ", "
time_list.reverse()
up_time += ":".join(time_list)
return up_time
async def stats_function(message, answers):
user_id = message._client.me.id
users = 0
group = 0
async for dialog in message._client.get_dialogs():
if dialog.chat.type == enums.ChatType.PRIVATE:
users += 1
elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP):
group += 1
if message._client.me.id == 1191668125:
status = "[DEV]"
elif message._client.me.id == 901878554:
status = "[ADMIN]"
else:
status = "[FREE USER]"
start = dt.now()
await message._client.invoke(Ping(ping_id=0))
ping = (dt.now() - start).microseconds / 1000
uptime = await get_readable_time((time.time() - StartTime))
msg = f"""
<b>Tiktok Ubot</b>
<b>Status : {status}</b>
<b>expired_on:</b> <code>unlimited</code>
<b>full_name:</b> <code>{message._client.me.first_name}</code>
<b>premium:</b> <code>{message._client.me.is_premium}</code>
<b>dc_id:</b> <code>{message._client.me.dc_id}</code>
<b>ping_dc:</b> <code>{ping} ms</code>
<b>peer_users:</b> <code>{users} users</code>
<b>peer_group:</b> <code>{group} group</code>
"""
answers.append(
InlineQueryResultArticle(
title="Alivei",
description="alive inline",
thumb_url="https://telegra.ph/file/b4cdd0843ac94d0309ba7.jpg",
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("Ping", callback_data="pingCB")]]
),
)
)
return answers
async def alive_function(message: Message, answers):
uptime = await get_readable_time((time.time() - StartTime))
user_id = message._client.me.id
users = 0
group = 0
async for dialog in message._client.get_dialogs():
if dialog.chat.type == enums.ChatType.PRIVATE:
users += 1
elif dialog.chat.type in (enums.ChatType.GROUP, enums.ChatType.SUPERGROUP):
group += 1
if message._client.me.id == 1191668125:
status = "[DEV]"
elif message._client.me.id == 901878554:
status = "[ADMIN]"
else:
status = "[FREE USER]"
start = dt.now()
await message._client.invoke(Ping(ping_id=0))
ping = (dt.now() - start).microseconds / 1000
uptime = await get_readable_time((time.time() - StartTime))
msg = f"""
<b>Tiktok Ubot</b>
<b>Status : {status}</b>
<b>expired_on:</b> <code>unlimited</code>
<b>full_name:</b> <code>{message._client.me.first_name}</code>
<b>premium:</b> <code>{message._client.me.is_premium}</code>
<b>dc_id:</b> <code>{message._client.me.dc_id}</code>
<b>ping_dc:</b> <code>{ping} ms</code>
<b>peer_users:</b> <code>{users} users</code>
<b>peer_group:</b> <code>{group} group</code>
"""
answers.append(
InlineQueryResultArticle(
title="Alive",
description="Check Bot's Stats",
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("Ping", callback_data="pingCB")]]
),
)
)
return answers
async def ping_function(query, answers):
uptime = await get_readable_time((time.time() - StartTime))
msg = f"""
• <b>ᴜᴘᴛɪᴍᴇ:</b> <code>{str(dt.now() - START_TIME).split('.')[0]}</code>
"""
answers.append(
InlineQueryResultArticle(
title="Ping",
description="ping test",
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML, disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("Ping", callback_data="pingCB")]]
),
)
)
return answers
async def help_function(answers):
bttn = paginate_help(0, CMD_HELP, "helpme")
answers.append(
InlineQueryResultArticle(
title="Help Article!",
description="Check Command List & Help",
thumb_url="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
input_message_content=InputTextMessageContent(
Data.text_help_menu.format(len(CMD_HELP)), disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(bttn),
)
)
return answers
@app.on_inline_query(filters.regex("bkp"))
async def inline_query_handler(client: Client, message: Message, query):
command_args = query.query.split()[1:]
if len(command_args) != 1:
return
link = command_args[0]
blacklist_url = "https://raw.githubusercontent.com/xtsea/pyKillerX/main/blacklist_channel.json"
response = requests.get(blacklist_url)
if response.status_code == 200:
blacklist_str = response.text.strip()
blacklist = json.loads(blacklist_str)
else:
print("Failed to fetch blacklist")
return
if any(link.startswith(prefix) for prefix in blacklist):
results = [
InlineQueryResultArticle(
title="Error",
input_message_content=InputTextMessageContent(
"Sorry, this command is not allowed in this channel/group."
),
)
]
else:
await client.copy_message(message.chat.id, from_chat_id=chat_id, message_id=message_id, caption=None)
results = []
await client.answer_inline_query(query.id, results=results, cache_time=0)
@app.on_callback_query(filters.regex("^closeuser_"))
async def closeuser(bot: Client, cb: CallbackQuery):
data = cb.data.split("_")
user_id = int(data[1])
if cb.from_user.id == user_id:
try:
await bot.delete_messages(cb.message.chat.id, message_ids=cb.inline_message_id)
except Exception as e:
print(str(e))
else:
await cb.answer("Not allowed user.", True)
@app.on_inline_query(filters.regex("test"))
@inline_wrapper
async def hello_world(client: Client, query: InlineQuery):
user_id = query.from_user.id
peer_users, peer_group, peer_channel = get_install_peer(user_id)
start = dt.now()
ping = (dt.now() - start).microseconds / 1000
tiktok_text, tiktok_button = get_test_button(user_id)
if tiktok_text and tiktok_button:
reply_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text=f"{tiktok_text}",
url=f"{tiktok_button}",
)
]
]
)
else:
reply_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="OWNER",
user_id=user_id
)
]
]
)
user = get_verify_id_card(user_id)
if user:
expired_on = "Unlimited"
else:
expired_on = get_expired_date(user_id)
if expired_on is None:
expired_on = "Unlimited"
else:
expired_on = expired_on.strftime("%d-%m-%Y")
prem = await client.get_users(user_id)
if prem.is_premium:
msg = f"""
<b>Akeno Ubot</b>
<b>Status :</b> <i>Ultra Diamond</i>
<b>premium:</b> <code>{prem.is_premium}</code>
<b>expired_on:</b> <code>{expired_on}</code>
<b>dc_id:</b> <code>{prem.dc_id}</code>
<b>ping_dc:</b> <code>{ping}</code>
<b>peer_users:</b> <code>{peer_users}</code>
<b>peer_group:</b> <code>{peer_group}</code>
<b>peer_channel:</b> <code>{peer_channel}</code>
<b>akeno_uptime:</b> <code>{str(dt.now() - dt.now()).split('.')[0]}</code>
<b>ryuzaki_library:</b> <code>{ryu}</code>
"""
else:
msg = f"""
<b>Akeno Ubot</b>
<b>Status :</b> <i>Non-Ultra</i>
<b>premium:</b> <code>Non-Premium</code>
<b>expired_on:</b> <code>{expired_on}</code>
<b>dc_id:</b> <code>{prem.dc_id}</code>
<b>peer_users:</b> <code>{peer_users}</code>
<b>peer_group:</b> <code>{peer_group}</code>
<b>peer_channel:</b> <code>{peer_channel}</code>
<b>akeno_uptime:</b> <code>{str(dt.now() - dt.now()).split('.')[0]}</code>
<b>ryuzaki_library:</b> <code>{ryu}</code>
"""
photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg"
results = [
InlineQueryResultPhoto(
photo_url=photo_url,
caption=msg,
input_message_content=InputTextMessageContent(
msg, parse_mode=ParseMode.HTML
),
reply_markup=reply_markup
)
]
await client.answer_inline_query(query.id, results=results, cache_time=0)
@app.on_inline_query(filters.regex("status_tools"))
@inline_wrapper
async def hello_world_2(client: Client, query: InlineQuery):
user_id = query.from_user.id
data_user = await client.get_users(user_id)
first_name = data_user.first_name
username = data_user.username if data_user else None
token = collection.find_one({"user_id": user_id})
prem_filter = token.get("premium_users") if token else None
expired = get_expired_date(user_id)
reply_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="MENU",
callback_data="check_back",
)
]
]
)
if prem_filter:
status = ""
status += f"<b>Status</b> : <code>Running</code>\n"
status += f"<b>Expired_on</b> : <code>{expired}</code>\n"
status += f"<b>First Name</b> : <code>{first_name}</code>\n"
status += f"<b>Username</b> : <code>@{username}</code>\n"
status += f"<b>UserID</b> : <code>{user_id}</code>\n"
photo_url = "https://telegra.ph/file/08dcd28e164b111e56546.jpg"
results = [
InlineQueryResultPhoto(
photo_url=photo_url,
caption=status,
input_message_content=InputTextMessageContent(
status, parse_mode=ParseMode.HTML
),
reply_markup=reply_markup
)
]
await client.answer_inline_query(query.id, results=results, cache_time=0)
@app.on_inline_query()
@inline_wrapper
async def inline_query_handler(client: Client, query):
try:
text = query.query.strip().lower()
string_given = query.query.lower()
answers = []
if text.strip() == "":
return
elif text.split()[0] == "taiok":
answerss = await alive_function(query, answers)
await client.answer_inline_query(query.id, results=answerss, cache_time=10)
elif string_given.startswith("helper"):
answers = await help_function(answers)
await client.answer_inline_query(query.id, results=answers, cache_time=0)
elif string_given.startswith("ping"):
answers = await ping_function(query, answers)
await client.answer_inline_query(query.id, results=answers, cache_time=10)
elif string_given.startswith("stats"):
m = [obj for obj in get_objects() if id(obj) == int(query.query.split(None, 1)[1])][0]
answers = await stats_function(m, answers)
await client.answer_inline_query(query.id, results=answers, cache_time=10)
except Exception as e:
e = traceback.format_exc()
print(e, "InLine")