akn-dev / akn /manage /approve_params.py
randydev's picture
fix revert back and update
21bc372
raw
history blame
24.4 kB
import time
from datetime import datetime as dt, timedelta
from pyrogram import *
from pyrogram import Client as ren, filters
from pyrogram.types import *
from akn.utils.database import db as db_client
from akn.utils.logger import LOGS
from akn.manage.new_start_funcs import initial_client_bots
from config import *
storage_running = {}
active_bots = {}
@ren.on_callback_query(filters.regex(r"^stopcts_(\d+)$"))
async def stopv_client(bot: Client, cb: CallbackQuery):
user_id = int(cb.matches[0].group(1))
if user_id not in storage_running:
await cb.answer("No active client found for this user.", show_alert=True)
return
user_bots = storage_running[user_id]
if not user_bots:
await cb.answer("No active client found for this user.", show_alert=True)
return
if not user_bots.is_connected:
await cb.answer("Client is not connected, cannot stop.", show_alert=True)
return
try:
await user_bots.stop()
del storage_running[user_id]
await cb.answer("Client stopped successfully.", show_alert=True)
except AttributeError:
await cb.answer("Client object does not have 'stop' method.", show_alert=True)
return
except Exception as e:
await cb.answer(f"Error stopping client: {type(e).__name__}: {str(e)}", show_alert=True)
return
@ren.on_callback_query(filters.regex(r"^(rejected_alldl|pending_alldl|approved_alldl)_(\d+)_(\w+)$"))
async def handle_admin_action(client: Client, callback: CallbackQuery):
global storage_running
action, user_id, uuid = callback.matches[0].groups()
action_type = action.split('_')[0]
admin_id = callback.from_user.id
admin_mention = callback.from_user.mention
try:
request = await db_client.alldl_bot.find_one({"user_id": int(user_id)})
if not request:
await callback.answer("❌ User request not found!", show_alert=True)
return await callback.message.edit_text(
f"{callback.message.text}\n\n⚠️ Failed: Request not found in database"
)
target_bot = None
for bot in request.get("bots", []):
if bot.get("uuid") == uuid:
target_bot = bot
break
if not target_bot:
await callback.answer("❌ No matching bot found!", show_alert=True)
return await callback.message.edit_text(
f"{callback.message.text}\n\n⚠️ Failed: No matching bot found"
)
bot_token = target_bot["bot_token"]
update_data = {
"bots.$.status": action_type,
"bots.$.admin_action": {
"by": admin_id,
"at": dt.now().strftime("%Y-%m-%d %H:%M:%S"),
"username": callback.from_user.username
},
"last_updated": dt.now().strftime("%Y-%m-%d %H:%M:%S")
}
if action_type == "rejected":
update_data["bots.$.status"] = "rejected"
update_data["bots.$.admin_action"]["reason"] = "No reason provided"
update_result = await db_client.alldl_bot.update_one(
{
"user_id": int(user_id),
"bots.uuid": uuid
},
{"$set": update_data}
)
if not update_result.modified_count:
await callback.answer("❌ Update failed!", show_alert=True)
return await callback.message.edit_text(
f"{callback.message.text}\n\n⚠️ Failed: update unsuccessful"
)
if action_type == "approved":
try:
user_bots = initial_client_bots(bot_token)
await user_bots.start()
storage_running[user_id] = user_bots
bot_user = await user_bots.get_me()
bot_username = f"@{bot_user.username}"
link_start = f"https://t.me/{bot_user.username}?start=start"
await db_client.alldl_bot.update_one(
{
"user_id": int(user_id),
"bots.uuid": uuid
},
{
"$set": {
"bots.$.bot_id": bot_user.id,
"bots.$.status": "approved",
"bots.$.is_active": True,
"bots.$.created_at": dt.now().strftime("%Y-%m-%d %H:%M:%S"),
"bots.$.start_time": dt.now(),
"bots.$.bot_username": f"@{bot_user.username}",
"bots.$.bot_id": bot_user.id,
"bots.$.started_at": dt.now().strftime("%Y-%m-%d %H:%M:%S"),
"bots.$.admin_action": {
"by": admin_id,
"at": dt.now().strftime("%Y-%m-%d %H:%M:%S")
}
},
"$inc": {"deployed_bots": 1}
}
)
caption = (
"🎉 **Your AllDL Bot Has Been Approved!**\n\n"
f"🤖 Bot Name: {bot_user.first_name}\n"
f"🔗 Username: {bot_username}\n"
f"🆔 Bot ID: `{bot_user.id}`\n\n"
"You can now start using your bot!\n\n"
"All Downloader Bot By akn-dev"
)
keyboard_start_now = InlineKeyboardMarkup([
[InlineKeyboardButton("🚀 Start Using Bot", url=link_start)],
[InlineKeyboardButton("📊 Manage Bots", callback_data="my_bots")]
])
await client.send_photo(
user_id,
photo="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
caption=caption,
reply_markup=keyboard_start_now
)
await client.send_message(
PRIVATE_LOGS,
f"✅ Bot Approved Successfully\n\n"
f"👤 User: {request.get('username', 'N/A')} ({user_id})\n"
f"🤖 Bot: {bot_username}\n"
f"🛠 Approved by: {admin_mention}\n"
f"⏰ Time: {dt.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
except Exception as e:
LOGS.error(f"Bot approval error: {str(e)}")
await db_client.alldl_bot.update_one(
{
"user_id": int(user_id),
"bots.uuid": uuid
},
{
"$set": {
"bots.$.is_active": False,
"bots.$.error": str(e),
"bots.$.status": "error"
}
}
)
await client.send_message(
PRIVATE_LOGS,
f"❌ Bot Approval Failed\n\n"
f"User ID: {user_id}\n"
f"Error: {str(e)}\n"
f"UUID: `{uuid}`"
)
await client.send_message(
user_id,
"⚠️ Bot approval failed due to technical reasons.\n"
"Our team has been notified. Please try again later."
)
return
status_icon = {
"approved": "✅",
"rejected": "❌",
"pending": "🕒"
}.get(action_type, "ℹ️")
await callback.message.edit_text(
f"{callback.message.text}\n\n"
f"{status_icon} Status: {action_type.capitalize()}ed by {admin_mention}\n"
f"⏰ {dt.now().strftime('%Y-%m-%d %H:%M:%S')}",
reply_markup=None
)
await callback.answer(f"Request {action_type}d successfully!")
except Exception as e:
LOGS.error(f"Admin action error: {str(e)}")
await callback.answer("⚠️ An error occurred!", show_alert=True)
await callback.message.edit_text(
f"{callback.message.text}\n\n"
f"❌ Error: {str(e)}"
)
await client.send_message(
PRIVATE_LOGS,
f"🚨 Admin Action Error\n\n"
f"Action: {action}\n"
f"Admin: {admin_mention}\n"
f"Error: {str(e)}"
)
@ren.on_callback_query(filters.regex(r"^(approve_capt|reject_capt)_(\d+)$"))
async def handle_captbot_action(client, callback: CallbackQuery):
global storage_running
action, user_id = callback.matches[0].groups()
action_type = action.replace("_capt", "")
update_data = {
"status": "approved" if action_type == "approve" else "rejected",
"admin_action": {
"by": 6477856957,
"at": dt.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
if action == "reject_capt":
update_data["admin_action"]["reason"] = "No reason provided"
await db_client.captcha_bot.update_one(
{"user_id": int(user_id)},
{"$set": update_data},
upsert=True
)
if action == "approve_capt":
request = await db_client.captcha_bot.find_one({"user_id": int(user_id)})
bot_token = request["bot_token"]
try:
user_bots = initial_client_bots(bot_token, "ApproveBot")
await user_bots.start()
storage_running[user_id] = user_bots
except Exception as e:
await client.send_message(user_id, "Sorry Try Again")
await client.send_message(PRIVATE_LOGS, f"Error {e}")
return
try:
bot_user = await user_bots.get_me()
bot_username = "@" + bot_user.username
bot_first_name = bot_user.first_name
link_start = "https://t.me/{}?start=start".format(bot_user.username)
caption = ""
caption += "Bot Name : {}\n".format(bot_first_name)
caption += "Bot Username : {}\n".format(bot_username)
caption += "Bot ID : <code>{}</code>\n".format(bot_user.id)
caption += "Captcha Bot By akn-dev\n"
keyboard_start_now = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Check Start Bot",
url=link_start
),
InlineKeyboardButton(
text="Stop Client",
callback_data=f"stopcts_{user_id}"
)
],
]
)
await client.send_photo(
user_id,
photo="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
caption=caption,
reply_markup=keyboard_start_now
)
await db_client.captcha_bot.update_one(
{"user_id": user_id},
{"$set": {"status": "approved"}},
upsert=True
)
except Exception as e:
await client.send_message(PRIVATE_LOGS, f"Error {e}")
await client.send_message(user_id, "Sorry Try Again")
return
status_icon = "✅" if action_type == "approve" else "❌"
await callback.message.edit_text(
f"{callback.message.text}\n\n"
f"Status: {status_icon} {action_type.capitalize()}ed by admin",
reply_markup=None
)
await callback.answer(f"Request {action_type}ed!")
@ren.on_callback_query(filters.regex(r"^(approve_sesibot|reject_sesibot)_(\d+)$"))
async def handle_sesibot_action(client, callback: CallbackQuery):
global storage_running
action, user_id = callback.matches[0].groups()
action_type = action.replace("_sesibot", "")
update_data = {
"status": "approved" if action_type == "approve" else "rejected",
"admin_action": {
"by": 6477856957,
"at": dt.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
if action == "reject_sesibot":
update_data["admin_action"]["reason"] = "No reason provided"
await db_client.session_bot.update_one(
{"user_id": int(user_id)},
{"$set": update_data},
upsert=True
)
if action == "approve_sesibot":
request = await db_client.session_bot.find_one({"user_id": int(user_id)})
bot_token = request["bot_token"]
try:
user_bots = initial_client_bots(bot_token, plugins="SessionBot")
await user_bots.start()
storage_running[user_id] = user_bots
except Exception as e:
await client.send_message(user_id, "Sorry Try Again")
await client.send_message(PRIVATE_LOGS, f"Error {e}")
return
try:
bot_user = await user_bots.get_me()
bot_username = "@" + bot_user.username
bot_first_name = bot_user.first_name
link_start = "https://t.me/{}?start=start".format(bot_user.username)
caption = ""
caption += "Bot Name : {}\n".format(bot_first_name)
caption += "Bot Username : {}\n".format(bot_username)
caption += "Bot ID : <code>{}</code>\n".format(bot_user.id)
caption += "Session Bot By akn-dev\n"
keyboard_start_now = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Check Start Bot",
url=link_start
)
],
]
)
await client.send_photo(
user_id,
photo="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
caption=caption,
reply_markup=keyboard_start_now
)
await db_client.session_bot.update_one(
{"user_id": user_id},
{"$set": {"status": "approved"}},
upsert=True
)
except Exception as e:
await client.send_message(PRIVATE_LOGS, f"Error {e}")
await client.send_message(user_id, "Sorry Try Again")
return
status_icon = "✅" if action_type == "approve" else "❌"
await callback.message.edit_text(
f"{callback.message.text}\n\n"
f"Status: {status_icon} {action_type.capitalize()}ed by admin",
reply_markup=None
)
await callback.answer(f"Request {action_type}ed!")
@ren.on_callback_query(filters.regex(r"^(approve_magicbot|reject_magicbot)_(\d+)$"))
async def handle_magicbot_action(client, callback: CallbackQuery):
global storage_running
action, user_id = callback.matches[0].groups()
action_type = action.replace("_magicbot", "")
update_data = {
"status": "approved" if action_type == "approve" else "rejected",
"admin_action": {
"by": 6477856957,
"at": dt.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
if action == "reject_magicbot":
update_data["admin_action"]["reason"] = "No reason provided"
await db_client.magic_bot.update_one(
{"user_id": int(user_id)},
{"$set": update_data},
upsert=True
)
if action == "approve_magicbot":
request = await db_client.magic_bot.find_one({"user_id": int(user_id)})
bot_token = request["bot_token"]
try:
user_bots = initial_client_bots(bot_token, plugins="MagicFonts")
await user_bots.start()
storage_running[user_id] = user_bots
except Exception as e:
await client.send_message(user_id, "Sorry Try Again")
await client.send_message(PRIVATE_LOGS, f"Error {e}")
return
try:
bot_user = await user_bots.get_me()
bot_username = "@" + bot_user.username
bot_first_name = bot_user.first_name
link_start = "https://t.me/{}?start=start".format(bot_user.username)
caption = ""
caption += "Bot Name : {}\n".format(bot_first_name)
caption += "Bot Username : {}\n".format(bot_username)
caption += "Bot ID : <code>{}</code>\n".format(bot_user.id)
caption += "Magic Fonts Bot By akn-dev\n"
keyboard_start_now = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Check Start Bot",
url=link_start
),
InlineKeyboardButton(
text="Stop Client",
callback_data=f"stopcts_{user_id}"
)
],
]
)
await client.send_photo(
user_id,
photo="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
caption=caption,
reply_markup=keyboard_start_now
)
await db_client.magic_bot.update_one(
{"user_id": user_id},
{"$set": {"status": "approved"}},
upsert=True
)
except Exception as e:
await client.send_message(PRIVATE_LOGS, f"Error {e}")
await client.send_message(user_id, "Sorry Try Again")
return
status_icon = "✅" if action_type == "approve" else "❌"
await callback.message.edit_text(
f"{callback.message.text}\n\n"
f"Status: {status_icon} {action_type.capitalize()}ed by admin",
reply_markup=None
)
await callback.answer(f"Request {action_type}ed!")
@ren.on_callback_query(filters.regex(r"^(approve_ytbot|reject_ytbot)_(\d+)$"))
async def handle_ytbot_action(client, callback: CallbackQuery):
global storage_running
action, user_id = callback.matches[0].groups()
action_type = action.replace("_magicbot", "")
update_data = {
"status": "approved" if action_type == "approve" else "rejected",
"admin_action": {
"by": 6477856957,
"at": dt.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
if action == "reject_ytbot":
update_data["admin_action"]["reason"] = "No reason provided"
await db_client.youtube_bot.update_one(
{"user_id": int(user_id)},
{"$set": update_data},
upsert=True
)
if action == "approve_ytbot":
request = await db_client.youtube_bot.find_one({"user_id": int(user_id)})
bot_token = request["bot_token"]
try:
user_bots = initial_client_bots(bot_token, plugins="Youtube")
await user_bots.start()
except Exception as e:
await client.send_message(user_id, "Sorry Try Again")
await client.send_message(PRIVATE_LOGS, f"Error {e}")
return
try:
bot_user = await user_bots.get_me()
bot_username = "@" + bot_user.username
bot_first_name = bot_user.first_name
link_start = "https://t.me/{}?start=start".format(bot_user.username)
caption = ""
caption += "Bot Name : {}\n".format(bot_first_name)
caption += "Bot Username : {}\n".format(bot_username)
caption += "Bot ID : <code>{}</code>\n".format(bot_user.id)
caption += "Youtube Bot By akn-dev\n"
keyboard_start_now = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Check Start Bot",
url=link_start
),
InlineKeyboardButton(
text="Stop Client",
callback_data=f"stopcts_{user_id}"
)
],
]
)
await client.send_photo(
user_id,
photo="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
caption=caption,
reply_markup=keyboard_start_now
)
await db_client.youtube_bot.update_one(
{"user_id": user_id},
{"$set": {"status": "approved"}},
upsert=True
)
except Exception as e:
await client.send_message(PRIVATE_LOGS, f"Error {e}")
await client.send_message(user_id, "Sorry Try Again")
return
status_icon = "✅" if action_type == "approve" else "❌"
await callback.message.edit_text(
f"{callback.message.text}\n\n"
f"Status: {status_icon} {action_type.capitalize()}ed by admin",
reply_markup=None
)
await callback.answer(f"Request {action_type}ed!")
@ren.on_callback_query(filters.regex(r"^(approve_geminibot|reject_geminibot)_(\d+)$"))
async def handle_geminibot_action(client, callback: CallbackQuery):
global storage_running
action, user_id = callback.matches[0].groups()
action_type = action.replace("_geminibot", "")
update_data = {
"status": "approved" if action_type == "approve" else "rejected",
"admin_action": {
"by": 6477856957,
"at": dt.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
if action == "reject_geminibot":
update_data["admin_action"]["reason"] = "No reason provided"
await db_client.gemini_bot.update_one(
{"user_id": int(user_id)},
{"$set": {"status": "approved"}},
upsert=True
)
if action == "approve_geminibot":
request = await db_client.gemini_bot.find_one({"user_id": int(user_id)})
bot_token = request["bot_token"]
try:
user_bots = initial_client_bots(bot_token, plugins="Gemini")
await user_bots.start()
storage_running[user_id] = user_bots
except Exception as e:
await client.send_message(user_id, "Sorry Try Again")
await client.send_message(PRIVATE_LOGS, f"Error {e}")
return
try:
bot_user = await user_bots.get_me()
bot_username = "@" + bot_user.username
bot_first_name = bot_user.first_name
link_start = "https://t.me/{}?start=start".format(bot_user.username)
caption = ""
caption += "Bot Name : {}\n".format(bot_first_name)
caption += "Bot Username : {}\n".format(bot_username)
caption += "Bot ID : <code>{}</code>\n".format(bot_user.id)
caption += "Gemini Bot By akn-dev\n"
keyboard_start_now = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Check Start Bot",
url=link_start
),
InlineKeyboardButton(
text="Stop Client",
callback_data=f"stopcts_{user_id}"
)
],
]
)
await client.send_photo(
user_id,
photo="https://telegra.ph//file/586a3867c3e16ca6bb4fa.jpg",
caption=caption,
reply_markup=keyboard_start_now
)
await db_client.gemini_bot.update_one(
{"user_id": user_id},
{"$set": {"status": "approved"}},
upsert=True
)
except Exception as e:
await client.send_message(PRIVATE_LOGS, f"Error {e}")
await client.send_message(user_id, "Sorry Try Again")
return
status_icon = "✅" if action_type == "approve" else "❌"
await callback.message.edit_text(
f"{callback.message.text}\n\n"
f"Status: {status_icon} {action_type.capitalize()}ed by admin",
reply_markup=None
)
await callback.answer(f"Request {action_type}ed!")