|
from datetime import datetime as dt, timedelta |
|
import re |
|
import uuid |
|
import traceback |
|
from pyrogram import Client |
|
from pyrogram import Client as ren, filters |
|
from pyrogram.types import * |
|
from akn.utils.database import db as db_client |
|
from config import PRIVATE_LOGS |
|
from akn.utils.logger import LOGS |
|
|
|
@ren.on_callback_query(filters.regex("^alldl_bot$")) |
|
async def new_alldlbot_clone(bot: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
chat_id = cb.message.chat.id |
|
user_mention = cb.from_user.mention |
|
|
|
if await db_client.get_maintance(): |
|
await cb.answer("π§ Bot is under maintenance. Please try later.", show_alert=True) |
|
return await cb.message.edit_text("π§ Maintenance Mode\n\n@xpushz is working on updates...") |
|
|
|
try: |
|
await cb.message.delete() |
|
|
|
try: |
|
bot_token_ask = await cb.message.chat.ask( |
|
"π€ Please send your bot token (from @BotFather):\n\n" |
|
"Format should be: `1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ`\n\n" |
|
"Type /cancel to abort", |
|
timeout=300 |
|
) |
|
except TimeoutError: |
|
await bot.send_message(chat_id, "β³ Session timed out after 5 minutes of inactivity.") |
|
return |
|
except Exception as e: |
|
LOGS.error(f"Error asking for bot token: {str(e)}") |
|
await bot.send_message(chat_id, "β οΈ error processing your request.") |
|
return |
|
|
|
if bot_token_ask.text.lower() == "/cancel": |
|
await bot.send_message(chat_id, "β Request cancelled") |
|
return |
|
|
|
bot_token = bot_token_ask.text.strip() |
|
await bot_token_ask.delete() |
|
|
|
if not re.match(r"^\d{9,11}:[a-zA-Z0-9_-]{35}$", bot_token): |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Invalid bot token format!\n\n" |
|
"Please ensure:\n" |
|
"1. It's copied exactly from @BotFather\n" |
|
"2. Format is: `1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ`\n\n" |
|
"Try again or contact support if issues persist." |
|
) |
|
return |
|
|
|
existing_request = await db_client.alldl_bot.find_one({"user_id": user_id}) |
|
|
|
token_exists = await db_client.alldl_bot.find_one( |
|
{"bots.bot_token": bot_token} |
|
) |
|
if token_exists: |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ This bot token is already registered in our system!\n\n" |
|
"Please create a new bot via @BotFather if you want to deploy another instance.", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Create New Bot", url="https://t.me/BotFather")], |
|
[InlineKeyboardButton("π Try Again", callback_data="alldl_bot")], |
|
[InlineKeyboardButton("π Manage Bots", callback_data="my_bots")] |
|
]) |
|
) |
|
return |
|
|
|
if existing_request and not existing_request.get("is_premium", False): |
|
approved_bots = sum(1 for bot in existing_request.get("bots", []) if bot.get("status") == "approved") |
|
if approved_bots >= 1: |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ **Free Tier Limit Reached!**\n\n" |
|
"πΉ Free users can deploy **1 bot maximum**\n" |
|
"πΉ Premium users get **unlimited bots**\n\n" |
|
"**Premium Features:**\n" |
|
"β
Multiple simultaneous bots\n" |
|
"β
Priority support\n" |
|
"β
Advanced configurations", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Upgrade to Premium", callback_data="premium_upgrades")], |
|
[InlineKeyboardButton("π Manage Existing Bots", callback_data="my_bots")] |
|
]) |
|
) |
|
return |
|
bot_uuid = str(uuid.uuid4())[:8] |
|
admin_buttons = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β
Approve", callback_data=f"approved_alldl_{user_id}_{bot_uuid}"), |
|
InlineKeyboardButton("β Reject", callback_data=f"rejected_alldl_{user_id}_{bot_uuid}")], |
|
[InlineKeyboardButton("π€ View User", url=f"tg://user?id={user_id}")] |
|
]) |
|
|
|
current_time = dt.now().strftime('%Y-%m-%d %H:%M:%S') |
|
bot_data = { |
|
"uuid": bot_uuid, |
|
"user_id": user_id, |
|
"bot_token": bot_token, |
|
"status": "pending", |
|
"is_active": False, |
|
"timestamp": current_time |
|
} |
|
|
|
if existing_request: |
|
await db_client.alldl_bot.update_one( |
|
{"user_id": user_id}, |
|
{"$push": {"bots": bot_data}, |
|
"$set": {"last_updated": current_time}}, |
|
upsert=True |
|
) |
|
approved_count = sum(1 for bot in existing_request.get("bots", []) if bot.get("status") == "approved") |
|
else: |
|
await db_client.alldl_bot.insert_one({ |
|
"user_id": user_id, |
|
"username": cb.from_user.username, |
|
"bots": [bot_data], |
|
"deployed_bots": 0, |
|
"is_premium": False, |
|
"created_at": current_time, |
|
"last_updated": current_time |
|
}) |
|
approved_count = 0 |
|
|
|
await bot.send_message( |
|
chat_id, |
|
f"β
**Deployment Request Submitted**\n\n" |
|
f"β«οΈ Current approved bots: {approved_count}\n" |
|
f"β«οΈ This will be bot #{approved_count + 1}\n\n" |
|
f"β³ Admin approval usually takes <15 minutes\n\n" |
|
f"π Token: `{bot_token[:10]}...`", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Check Status", callback_data=f"statusdl_{user_id}")], |
|
[InlineKeyboardButton("π Request Another", callback_data="customzie_bot")] |
|
]) |
|
) |
|
|
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ **New AllDL Bot Request**\n\n" |
|
f"π€ User: {user_mention} (`{user_id}`)\n" |
|
f"π Username: @{cb.from_user.username}\n" |
|
f"π Token: `{bot_token[:10]}...`\n" |
|
f"β° Submitted: {current_time}\n" |
|
f"π· Tier: {'π Premium' if existing_request and existing_request.get('is_premium') else 'π Free'}", |
|
reply_markup=admin_buttons |
|
) |
|
|
|
except Exception as e: |
|
LOGS.error(f"AllDL Bot Request Error: {type(e).__name__} - {str(e)}") |
|
traceback.print_exc() |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Please try again later.\n" |
|
"If this persists, contact @xpushz", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Try Again", callback_data="alldl_bot")] |
|
]) |
|
) |
|
await bot.send_message( |
|
PRIVATE_LOGS, |
|
f"π¨ AllDL Bot Request Error\n\n" |
|
f"User: {user_mention} ({user_id})\n" |
|
f"Error: {type(e).__name__}\n" |
|
f"Details: {str(e)}" |
|
) |
|
|
|
@ren.on_callback_query(filters.regex("^captcha_bot$")) |
|
async def new_captcha_clone(bot: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
chat_id = cb.message.chat.id |
|
|
|
if await db_client.get_maintance(): |
|
await cb.answer("π§ Bot is under maintenance. Please try later.", show_alert=True) |
|
return await cb.message.edit_text("π§ Maintenance Mode\n\n@xpushz is working on updates...") |
|
try: |
|
await cb.message.delete() |
|
try: |
|
bot_token_ask = await cb.message.chat.ask( |
|
"π€ Please send your bot token (from @BotFather):\n\n" |
|
"Format should be: `1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ`\n\n" |
|
"Type /cancel to abort", |
|
timeout=300 |
|
) |
|
except TimeoutError: |
|
await bot.send_message(chat_id, "β³ Timeout: No response received after 5 minutes") |
|
return |
|
|
|
if bot_token_ask.text.lower() == "/cancel": |
|
await bot.send_message(chat_id, "β Request cancelled") |
|
return |
|
|
|
bot_token = bot_token_ask.text.strip() |
|
await bot_token_ask.delete() |
|
|
|
if not re.match(r"^\d+:[a-zA-Z0-9_-]+$", bot_token): |
|
await bot.send_message(chat_id, "β οΈ Invalid bot token format!\nExample: `123456:ABC-DEF123xyz`") |
|
return |
|
|
|
existing_request = await db_client.captcha_bot.find_one({"user_id": user_id}) |
|
|
|
admin_buttons = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β
Approve", callback_data=f"approve_capt_{user_id}"), |
|
InlineKeyboardButton("β Reject", callback_data=f"reject_capt_{user_id}")] |
|
]) |
|
if existing_request: |
|
if existing_request.get("status") == "approved": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ You already have an approved bot!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "rejected": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Your previous request was rejected!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "pending": |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New captcha Bot Request is Already\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {existing_request.get('timestamp', 'N/A')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β³ Captcha: You already have a pending request!\n" |
|
f"Submitted at: {existing_request.get('timestamp', 'N/A')}\n\n" |
|
"Please wait for admin approval.\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Status Check", callback_data=f"statuscapt_{user_id}")] |
|
]) |
|
) |
|
return |
|
await db_client.captcha_bot.update_one( |
|
{"user_id": user_id}, |
|
{"$set": { |
|
"bot_token": bot_token, |
|
"status": "pending", |
|
"timestamp": dt.now().strftime('%Y-%m-%d %H:%M') |
|
}}, |
|
upsert=True |
|
) |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Captcha Bot Request\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {dt.now().strftime('%Y-%m-%d %H:%M')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β
Captcha: Request submitted successfully!\n\n" |
|
"Our admins will review your submission shortly.\n" |
|
"You'll receive a notification when processed.", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Check Status", callback_data=f"statuscapt_{user_id}")] |
|
]) |
|
) |
|
except Exception as e: |
|
LOGS.error(f"captcha Bot Request Error: {str(e)}") |
|
traceback.print_exc() |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Please try again later.\n" |
|
"If this persists, contact @xpushz", |
|
) |
|
await bot.send_message(PRIVATE_LOGS, f"Error: {type(e).__name__} - {str(e)}") |
|
|
|
@ren.on_callback_query(filters.regex("^sesikntl$")) |
|
async def new_sessionbot_clone(bot: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
chat_id = cb.message.chat.id |
|
|
|
if await db_client.get_maintance(): |
|
await cb.answer("π§ Bot is under maintenance. Please try later.", show_alert=True) |
|
return await cb.message.edit_text("π§ Maintenance Mode\n\n@xpushz is working on updates...") |
|
try: |
|
await cb.message.delete() |
|
try: |
|
bot_token_ask = await cb.message.chat.ask( |
|
"π€ Please send your bot token (from @BotFather):\n\n" |
|
"Format should be: `1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ`\n\n" |
|
"Type /cancel to abort", |
|
timeout=300 |
|
) |
|
except TimeoutError: |
|
await bot.send_message(chat_id, "β³ Timeout: No response received after 5 minutes") |
|
return |
|
|
|
if bot_token_ask.text.lower() == "/cancel": |
|
await bot.send_message(chat_id, "β Request cancelled") |
|
return |
|
|
|
bot_token = bot_token_ask.text.strip() |
|
await bot_token_ask.delete() |
|
|
|
if not re.match(r"^\d+:[a-zA-Z0-9_-]+$", bot_token): |
|
await bot.send_message(chat_id, "β οΈ Invalid bot token format!\nExample: `123456:ABC-DEF123xyz`") |
|
return |
|
|
|
existing_request = await db_client.session_bot.find_one({"user_id": user_id}) |
|
|
|
admin_buttons = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β
Approve", callback_data=f"approve_sesibot_{user_id}"), |
|
InlineKeyboardButton("β Reject", callback_data=f"reject_sesibot_{user_id}")] |
|
]) |
|
if existing_request: |
|
if existing_request.get("status") == "approved": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ You already have an approved bot!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "rejected": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Your previous request was rejected!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "pending": |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Session Bot Request is Already\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {existing_request.get('timestamp', 'N/A')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β³ SesiBot: You already have a pending request!\n" |
|
f"Submitted at: {existing_request.get('timestamp', 'N/A')}\n\n" |
|
"Please wait for admin approval.\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Status Check", callback_data=f"statussesi_{user_id}")] |
|
]) |
|
) |
|
return |
|
await db_client.session_bot.update_one( |
|
{"user_id": user_id}, |
|
{"$set": { |
|
"bot_token": bot_token, |
|
"status": "pending", |
|
"is_running": False, |
|
"timestamp": dt.now().strftime('%Y-%m-%d %H:%M') |
|
}}, |
|
upsert=True |
|
) |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Session Bot Request\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {dt.now().strftime('%Y-%m-%d %H:%M')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β
SesiBot: Request submitted successfully!\n\n" |
|
"Our admins will review your submission shortly.\n" |
|
"You'll receive a notification when processed.", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Check Status", callback_data=f"statusnew_{user_id}")] |
|
]) |
|
) |
|
except Exception as e: |
|
LOGS.error(f"Session Bot Request Error: {str(e)}") |
|
traceback.print_exc() |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Please try again later.\n" |
|
"If this persists, contact @xpushz", |
|
) |
|
await bot.send_message(PRIVATE_LOGS, f"Error: {type(e).__name__} - {str(e)}") |
|
|
|
@ren.on_callback_query(filters.regex("^magic_font$")) |
|
async def new_magic_clone(bot: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
chat_id = cb.message.chat.id |
|
|
|
if await db_client.get_maintance(): |
|
await cb.answer("π§ Bot is under maintenance. Please try later.", show_alert=True) |
|
return await cb.message.edit_text("π§ Maintenance Mode\n\n@xpushz is working on updates...") |
|
try: |
|
await cb.message.delete() |
|
try: |
|
bot_token_ask = await cb.message.chat.ask( |
|
"π€ Please send your bot token (from @BotFather):\n\n" |
|
"Format should be: `1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ`\n\n" |
|
"Type /cancel to abort", |
|
timeout=300 |
|
) |
|
except TimeoutError: |
|
await bot.send_message(chat_id, "β³ Timeout: No response received after 5 minutes") |
|
return |
|
|
|
if bot_token_ask.text.lower() == "/cancel": |
|
await bot.send_message(chat_id, "β Request cancelled") |
|
return |
|
|
|
bot_token = bot_token_ask.text.strip() |
|
await bot_token_ask.delete() |
|
|
|
if not re.match(r"^\d+:[a-zA-Z0-9_-]+$", bot_token): |
|
await bot.send_message(chat_id, "β οΈ Invalid bot token format!\nExample: `123456:ABC-DEF123xyz`") |
|
return |
|
|
|
existing_request = await db_client.magic_bot.find_one({"user_id": user_id}) |
|
|
|
admin_buttons = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β
Approve", callback_data=f"approve_magicbot_{user_id}"), |
|
InlineKeyboardButton("β Reject", callback_data=f"reject_magicbot_{user_id}")] |
|
]) |
|
if existing_request: |
|
if existing_request.get("status") == "approved": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ You already have an approved bot!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "rejected": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Your previous request was rejected!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "pending": |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Magic Fonts Bot Request is Already\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {existing_request.get('timestamp', 'N/A')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β³ Magic: You already have a pending request!\n" |
|
f"Submitted at: {existing_request.get('timestamp', 'N/A')}\n\n" |
|
"Please wait for admin approval.\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Status Check", callback_data=f"statusmagic_{user_id}")] |
|
]) |
|
) |
|
return |
|
await db_client.magic_bot.update_one( |
|
{"user_id": user_id}, |
|
{"$set": { |
|
"bot_token": bot_token, |
|
"status": "pending", |
|
"timestamp": dt.now().strftime('%Y-%m-%d %H:%M') |
|
}}, |
|
upsert=True |
|
) |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Magic Fonts Bot Request\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {dt.now().strftime('%Y-%m-%d %H:%M')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β
Magic: Request submitted successfully!\n\n" |
|
"Our admins will review your submission shortly.\n" |
|
"You'll receive a notification when processed.", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Check Status", callback_data=f"statusmagic_{user_id}")] |
|
]) |
|
) |
|
except Exception as e: |
|
LOGS.error(f"Magic Fonts Request Error: {str(e)}") |
|
traceback.print_exc() |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Please try again later.\n" |
|
"If this persists, contact @xpushz", |
|
) |
|
await bot.send_message(PRIVATE_LOGS, f"Error: {type(e).__name__} - {str(e)}") |
|
|
|
@ren.on_callback_query(filters.regex("^yt_bot$")) |
|
async def new_youtube_clone(bot: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
chat_id = cb.message.chat.id |
|
|
|
if await db_client.get_maintance(): |
|
await cb.answer("π§ Bot is under maintenance. Please try later.", show_alert=True) |
|
return await cb.message.edit_text("π§ Maintenance Mode\n\n@xpushz is working on updates...") |
|
try: |
|
await cb.message.delete() |
|
try: |
|
bot_token_ask = await cb.message.chat.ask( |
|
"π€ Please send your bot token (from @BotFather):\n\n" |
|
"Format should be: `1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ`\n\n" |
|
"Type /cancel to abort", |
|
timeout=300 |
|
) |
|
except TimeoutError: |
|
await bot.send_message(chat_id, "β³ Timeout: No response received after 5 minutes") |
|
return |
|
|
|
if bot_token_ask.text.lower() == "/cancel": |
|
await bot.send_message(chat_id, "β Request cancelled") |
|
return |
|
|
|
bot_token = bot_token_ask.text.strip() |
|
await bot_token_ask.delete() |
|
|
|
if not re.match(r"^\d+:[a-zA-Z0-9_-]+$", bot_token): |
|
await bot.send_message(chat_id, "β οΈ Invalid bot token format!\nExample: `123456:ABC-DEF123xyz`") |
|
return |
|
|
|
existing_request = await db_client.youtube_bot.find_one({"user_id": user_id}) |
|
|
|
admin_buttons = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β
Approve", callback_data=f"approve_ytbot_{user_id}"), |
|
InlineKeyboardButton("β Reject", callback_data=f"reject_ytbot_{user_id}")] |
|
]) |
|
if existing_request: |
|
if existing_request.get("status") == "approved": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ You already have an approved bot!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "rejected": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Your previous request was rejected!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "pending": |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Youtube Bot Request is Already\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {existing_request.get('timestamp', 'N/A')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β³ Youtube: You already have a pending request!\n" |
|
f"Submitted at: {existing_request.get('timestamp', 'N/A')}\n\n" |
|
"Please wait for admin approval.\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Status Check", callback_data=f"statusyt_{user_id}")] |
|
]) |
|
) |
|
return |
|
await db_client.youtube_bot.update_one( |
|
{"user_id": user_id}, |
|
{"$set": { |
|
"bot_token": bot_token, |
|
"status": "pending", |
|
"timestamp": dt.now().strftime('%Y-%m-%d %H:%M') |
|
}}, |
|
upsert=True |
|
) |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Youtube Bot Request\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {dt.now().strftime('%Y-%m-%d %H:%M')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β
Youtube: Request submitted successfully!\n\n" |
|
"Our admins will review your submission shortly.\n" |
|
"You'll receive a notification when processed.", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Check Status", callback_data=f"statusyt_{user_id}")] |
|
]) |
|
) |
|
except Exception as e: |
|
LOGS.error(f"Youtube Request Error: {str(e)}") |
|
traceback.print_exc() |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Please try again later.\n" |
|
"If this persists, contact @xpushz", |
|
) |
|
await bot.send_message(PRIVATE_LOGS, f"Error: {type(e).__name__} - {str(e)}") |
|
|
|
@ren.on_callback_query(filters.regex("^gemini_bot$")) |
|
async def new_gemini_clone(bot: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
chat_id = cb.message.chat.id |
|
|
|
if await db_client.get_maintance(): |
|
await cb.answer("π§ Bot is under maintenance. Please try later.", show_alert=True) |
|
return await cb.message.edit_text("π§ Maintenance Mode\n\n@xpushz is working on updates...") |
|
try: |
|
await cb.message.delete() |
|
try: |
|
bot_token_ask = await cb.message.chat.ask( |
|
"π€ Please send your bot token (from @BotFather):\n\n" |
|
"Format should be: `1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ`\n\n" |
|
"Type /cancel to abort", |
|
timeout=300 |
|
) |
|
except TimeoutError: |
|
await bot.send_message(chat_id, "β³ Timeout: No response received after 5 minutes") |
|
return |
|
|
|
if bot_token_ask.text.lower() == "/cancel": |
|
await bot.send_message(chat_id, "β Request cancelled") |
|
return |
|
|
|
bot_token = bot_token_ask.text.strip() |
|
await bot_token_ask.delete() |
|
|
|
if not re.match(r"^\d+:[a-zA-Z0-9_-]+$", bot_token): |
|
await bot.send_message(chat_id, "β οΈ Invalid bot token format!\nExample: `123456:ABC-DEF123xyz`") |
|
return |
|
|
|
existing_request = await db_client.gemini_bot.find_one({"user_id": user_id}) |
|
|
|
admin_buttons = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β
Approve", callback_data=f"approve_geminibot_{user_id}"), |
|
InlineKeyboardButton("β Reject", callback_data=f"reject_geminibot_{user_id}")] |
|
]) |
|
if existing_request: |
|
if existing_request.get("status") == "approved": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ You already have an approved bot!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "rejected": |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Your previous request was rejected!\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("β Back", callback_data="customzie_bot")] |
|
]) |
|
) |
|
return |
|
if existing_request.get("status") == "pending": |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Gemini Bot Request is Already\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {existing_request.get('timestamp', 'N/A')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β³ Gemini: You already have a pending request!\n" |
|
f"Submitted at: {existing_request.get('timestamp', 'N/A')}\n\n" |
|
"Please wait for admin approval.\n", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Status Check", callback_data=f"statusgm_{user_id}")] |
|
]) |
|
) |
|
return |
|
await db_client.gemini_bot.update_one( |
|
{"user_id": user_id}, |
|
{"$set": { |
|
"bot_token": bot_token, |
|
"status": "pending", |
|
"timestamp": dt.now().strftime('%Y-%m-%d %H:%M') |
|
}}, |
|
upsert=True |
|
) |
|
await bot.send_message( |
|
chat_id=PRIVATE_LOGS, |
|
text=f"π₯ New Gemini Bot Request\n\n" |
|
f"π€ User: {cb.from_user.mention} (`{user_id}`)\n" |
|
f"π Token: `{bot_token}`\n" |
|
f"β° Submitted: {dt.now().strftime('%Y-%m-%d %H:%M')}", |
|
reply_markup=admin_buttons, |
|
) |
|
await bot.send_message( |
|
chat_id, |
|
"β
Gemini: Request submitted successfully!\n\n" |
|
"Our admins will review your submission shortly.\n" |
|
"You'll receive a notification when processed.", |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("π Check Status", callback_data=f"statusgm_{user_id}")] |
|
]) |
|
) |
|
except Exception as e: |
|
LOGS.error(f"Gemini Request Error: {str(e)}") |
|
traceback.print_exc() |
|
await bot.send_message( |
|
chat_id, |
|
"β οΈ Please try again later.\n" |
|
"If this persists, contact @xpushz", |
|
) |
|
await bot.send_message(PRIVATE_LOGS, f"Error: {type(e).__name__} - {str(e)}") |