|
from datetime import datetime as dt |
|
from pyrogram import * |
|
from pyrogram.types import * |
|
from pyrogram.enums import * |
|
from akn.utils.database import db |
|
|
|
unmute_permissions = ChatPermissions( |
|
can_send_messages=True, |
|
can_send_media_messages=True, |
|
can_send_polls=True, |
|
can_send_other_messages=True, |
|
can_add_web_page_previews=True, |
|
can_change_info=False, |
|
can_invite_users=True, |
|
can_pin_messages=False |
|
) |
|
|
|
|
|
async def extract_userid(message, text: str): |
|
def is_int(text: str): |
|
try: |
|
int(text) |
|
except ValueError: |
|
return False |
|
return True |
|
|
|
text = text.strip() |
|
|
|
if is_int(text): |
|
return int(text) |
|
|
|
entities = message.entities |
|
app = message._client |
|
if len(entities) < 2: |
|
return (await app.get_users(text)).id |
|
entity = entities[1] |
|
if entity.type == "mention": |
|
return (await app.get_users(text)).id |
|
if entity.type == "text_mention": |
|
return entity.user.id |
|
return None |
|
|
|
cached_admins = {} |
|
|
|
async def arz_update_admin_cache(client, chat_id): |
|
admin_list = [] |
|
async for m in client.get_chat_members(chat_id, filter=ChatMembersFilter.ADMINISTRATORS): |
|
if not m.user.is_bot and not m.user.is_deleted: |
|
admin_list.append(m.user.id) |
|
cached_admins[chat_id] = admin_list |
|
return admin_list |
|
|
|
@Client.on_callback_query(filters.regex(r"^arzunmuted_(\-?\d+)_(\d+)$")) |
|
async def xarzunmute_cbok(client, callback): |
|
chat_id = int(callback.matches[0].group(1)) |
|
user_id = int(callback.matches[0].group(2)) |
|
chat_member = await client.get_chat_member(chat_id, client.me.id) |
|
if not chat_member.privileges or not chat_member.privileges.can_restrict_members: |
|
await callback.answer("β οΈ Bot needs 'Restrict Members' permission!", show_alert=True) |
|
try: |
|
chat_data = await db.alldl_bot.find_one({"chat_ids": chat_id}) |
|
if not chat_data: |
|
await callback.answer("β οΈ You can /reload again admins", show_alert=True) |
|
return |
|
admins = chat_data.get("chat_admins", []) if chat_data else [] |
|
if not admins: |
|
await callback.answer("β οΈ You can /reload again admins", show_alert=True) |
|
return |
|
if callback.from_user.id not in admins: |
|
await callback.answer("β οΈ Only admins can unmute!", show_alert=True) |
|
return |
|
user_mention = callback.from_user.mention |
|
await client.restrict_chat_member(chat_id, user_id, unmute_permissions) |
|
await db.alldl_bot.update_one( |
|
{"bot_id": client.me.id}, |
|
{"$pull": {"mute_user": user_id}}, |
|
upsert=True |
|
) |
|
await callback.message.edit_text( |
|
f"β
User {user_id} unmuted by {user_mention}", |
|
reply_markup=None |
|
) |
|
except Exception as e: |
|
LOGS.error(f"Error xarzunmute_cbok: {str(e)} ") |
|
await callback.message.edit_text(f"β Failed: {str(e)}") |
|
|
|
@Client.on_callback_query(filters.regex(r"^arzunband_(\-?\d+)_(\d+)$")) |
|
async def xarzunban_cbok(client, callback): |
|
chat_id = int(callback.matches[0].group(1)) |
|
user_id = int(callback.matches[0].group(2)) |
|
chat_member = await client.get_chat_member(chat_id, client.me.id) |
|
if not chat_member.privileges or not chat_member.privileges.can_restrict_members: |
|
await callback.answer("β οΈ Bot needs 'Restrict Members' permission!", show_alert=True) |
|
try: |
|
chat_data = await db.alldl_bot.find_one({"chat_ids": chat_id}) |
|
chat_data = await db.alldl_bot.find_one({"chat_ids": chat_id}) |
|
if not chat_data: |
|
await callback.answer("β οΈ You can /reload again admins", show_alert=True) |
|
return |
|
admins = chat_data.get("chat_admins", []) if chat_data else [] |
|
if not admins: |
|
await callback.answer("β οΈ You can /reload again admins", show_alert=True) |
|
return |
|
if callback.from_user.id not in admins: |
|
await callback.answer("β οΈ Only admins can unmute!", show_alert=True) |
|
return |
|
user_mention = callback.from_user.mention |
|
await client.unban_chat_member(chat_id, user_id) |
|
await callback.message.edit_text( |
|
f"β
User {user_id} unbanned by {user_mention}", |
|
reply_markup=None |
|
) |
|
except Exception as e: |
|
LOGS.error(f"Error xarzunban_cbok: {str(e)} ") |
|
await callback.message.edit_text(f"β Failed: {str(e)}") |
|
|
|
async def extract_user_and_reason(message, sender_chat=False): |
|
args = message.text.strip().split() |
|
text = message.text |
|
user = None |
|
reason = None |
|
if message.reply_to_message: |
|
reply = message.reply_to_message |
|
if not reply.from_user: |
|
if ( |
|
reply.sender_chat |
|
and reply.sender_chat != message.chat.id |
|
and sender_chat |
|
): |
|
id_ = reply.sender_chat.id |
|
else: |
|
return None, None |
|
else: |
|
id_ = reply.from_user.id |
|
|
|
if len(args) < 2: |
|
reason = None |
|
else: |
|
reason = text.split(None, 1)[1] |
|
return id_, reason |
|
|
|
if len(args) == 2: |
|
user = text.split(None, 1)[1] |
|
return await extract_userid(message, user), None |
|
|
|
if len(args) > 2: |
|
user, reason = text.split(None, 2)[1:] |
|
return await extract_userid(message, user), reason |
|
|
|
return user, reason |
|
|
|
|
|
async def extract_user(message): |
|
return (await extract_user_and_reason(message))[0] |
|
|
|
@Client.on_message( |
|
~filters.scheduled & filters.command(["reload"]) & ~filters.forwarded |
|
) |
|
async def arz_reload(client: Client, message: Message): |
|
if not message.from_user: |
|
return |
|
|
|
bot_permissions = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot_permissions or not bot_permissions.can_restrict_members: |
|
return await message.reply_text("β I need admin permissions to restrict members") |
|
|
|
admins = await arz_update_admin_cache(client, message.chat.id) |
|
await db.alldl_bot.update_one( |
|
{"bot_id": client.me.id}, |
|
{ |
|
"$addToSet": {"chat_ids": message.chat.id}, |
|
"$set": { |
|
"chat_admins": admins, |
|
"last_updated": dt.now() |
|
} |
|
}, |
|
upsert=True |
|
) |
|
await message.reply_text( |
|
"π Admin list reloaded successfully!", |
|
) |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["ban", "dban"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_ban_user(client: Client, message: Message): |
|
user_id, reason = await extract_user_and_reason(message, sender_chat=True) |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions") |
|
if not user_id: |
|
return await message.reply_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await message.reply_text("I can't ban myself.") |
|
if user_id == 1191668125: |
|
return await message.reply_text("I can't ban my developer!") |
|
try: |
|
mention = (await client.get_users(user_id)).mention |
|
except IndexError: |
|
mention = ( |
|
message.reply_to_message.sender_chat.title |
|
if message.reply_to_message |
|
else "Anon" |
|
) |
|
msg = ( |
|
f"**Banned User:** <a href='tg://user?id={user_id}'>{mention}</a>\n" |
|
f"**Banned By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n" |
|
) |
|
if message.command[0][0] == "d": |
|
await message.reply_to_message.delete() |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.ban_member(user_id) |
|
await message.reply_text( |
|
msg, |
|
reply_markup=InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
"β« Unban", callback_data=f"arzunband_{message.chat.id}_{user_id}" |
|
) |
|
] |
|
] |
|
) |
|
) |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["unban"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_unban_user(client: Client, message: Message): |
|
user_id, reason = await extract_user_and_reason(message, sender_chat=True) |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions") |
|
if not user_id: |
|
return await message.reply_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await message.reply_text("I can't unban myself.") |
|
if user_id == 1191668125: |
|
return await message.reply_text("I can't unban my developer!") |
|
try: |
|
mention = (await client.get_users(user_id)).mention |
|
except IndexError: |
|
mention = ( |
|
message.reply_to_message.sender_chat.title |
|
if message.reply_to_message |
|
else "Anon" |
|
) |
|
msg = ( |
|
f"**Unbanned User:** <a href='tg://user?id={user_id}'>{mention}</a>\n" |
|
f"**Unbanned By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n" |
|
) |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.unban_member(user_id) |
|
await message.reply_text(msg) |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["banme"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_ban_me(client: Client, message: Message): |
|
user_id = message.from_user.id |
|
reason = None |
|
if not user_id: |
|
return await message.reply_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await message.reply_text("I can't ban myself.") |
|
if user_id == 1191668125: |
|
return await message.reply_text("I can't ban my developer!") |
|
try: |
|
mention = (await client.get_users(user_id)).mention |
|
except IndexError: |
|
mention = ( |
|
message.reply_to_message.sender_chat.title |
|
if message.reply_to_message |
|
else "Anon" |
|
) |
|
msg = ( |
|
f"**Banned User:** <a href='tg://user?id={user_id}'>{mention}</a>\n" |
|
f"**Banned By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n" |
|
) |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.ban_member(user_id) |
|
await message.reply_text( |
|
msg, |
|
reply_markup=InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
"β« Unban", callback_data=f"arzunban_{message.chat.id}_{user_id}" |
|
) |
|
] |
|
] |
|
) |
|
) |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["mute", "dmute"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_mute(client: Client, message: Message): |
|
user_id, reason = await extract_user_and_reason(message, sender_chat=True) |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions") |
|
if not user_id: |
|
return await message.reply_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await message.reply_text("I can't mute myself.") |
|
if user_id == 1191668125: |
|
return await message.reply_text("I can't mute my developer!") |
|
try: |
|
mention = (await client.get_users(user_id)).mention |
|
except IndexError: |
|
mention = ( |
|
message.reply_to_message.sender_chat.title |
|
if message.reply_to_message |
|
else "Anon" |
|
) |
|
msg = ( |
|
f"**Muted User:** <a href='tg://user?id={user_id}'>{mention}</a>\n" |
|
f"**Muted By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n" |
|
) |
|
if message.command[0][0] == "d": |
|
await message.reply_to_message.delete() |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.restrict_member(user_id, ChatPermissions()) |
|
await db.alldl_bot.update_one( |
|
{"bot_id": client.me.id}, |
|
{"$addToSet": {"mute_user": user_id}}, |
|
upsert=True, |
|
) |
|
await message.reply_text( |
|
msg, |
|
reply_markup=InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
"β« Unmute", callback_data=f"arzunmuted_{message.chat.id}_{user_id}" |
|
) |
|
] |
|
] |
|
) |
|
) |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["unmute"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_unmute(client: Client, message: Message): |
|
user_id, reason = await extract_user_and_reason(message, sender_chat=True) |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions") |
|
if not user_id: |
|
return await message.reply_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await message.reply_text("I can't unmute myself.") |
|
if user_id == 1191668125: |
|
return await message.reply_text("I can't unmute my developer!") |
|
try: |
|
mention = (await client.get_users(user_id)).mention |
|
except IndexError: |
|
mention = ( |
|
message.reply_to_message.sender_chat.title |
|
if message.reply_to_message |
|
else "Anon" |
|
) |
|
msg = ( |
|
f"**Unmuted User:** <a href='tg://user?id={user_id}'>{mention}</a>\n" |
|
f"**Unmuted By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n" |
|
) |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.restrict_member(user_id, unmute_permissions) |
|
await message.chat.unban_member(user_id) |
|
await db.alldl_bot.update_one( |
|
{"bot_id": client.me.id}, |
|
{"$pull": {"mute_user": user_id}}, |
|
upsert=True |
|
) |
|
await message.reply_text(msg) |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["muteall"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_mute_all(client: Client, message: Message): |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions") |
|
if message.chat.type == ChatType.PRIVATE: |
|
return await message.reply_text("This command is not available in private chats.") |
|
if message.chat.type == ChatType.SUPERGROUP: |
|
try: |
|
await client.set_chat_permissions( |
|
message.chat.id, |
|
permissions=ChatPermissions() |
|
) |
|
await message.reply_text("All members muted successfully.") |
|
except Exception as e: |
|
await message.reply_text(f"Failed to mute all members: {e}") |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["unmuteall"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_unmute_all(client: Client, message: Message): |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions") |
|
if message.chat.type == ChatType.PRIVATE: |
|
return await message.reply_text("This command is not available in private chats.") |
|
if message.chat.type == ChatType.SUPERGROUP: |
|
try: |
|
await client.set_chat_permissions( |
|
message.chat.id, |
|
permissions=unmute_permissions |
|
) |
|
await message.reply_text("All members unmuted successfully.") |
|
except Exception as e: |
|
await message.reply_text(f"Failed to unmute all members: {e}") |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["muteme"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_mute_me(client: Client, message: Message): |
|
user_id = message.from_user.id |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions") |
|
reason = None |
|
if not user_id: |
|
return await message.reply_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await message.reply_text("I can't mute myself.") |
|
if user_id == 1191668125: |
|
return await message.reply_text("I can't mute my developer!") |
|
try: |
|
mention = (await client.get_users(user_id)).mention |
|
except IndexError: |
|
mention = ( |
|
message.reply_to_message.sender_chat.title |
|
if message.reply_to_message |
|
else "Anon" |
|
) |
|
msg = ( |
|
f"**Muted User:** <a href='tg://user?id={user_id}'>{mention}</a>\n" |
|
f"**Muted By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n" |
|
) |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.restrict_member(user_id, ChatPermissions()) |
|
await db.alldl_bot.update_one( |
|
{"bot_id": client.me.id}, |
|
{"$addToSet": {"mute_user": user_id}}, |
|
upsert=True, |
|
) |
|
await message.reply_text( |
|
msg, |
|
reply_markup=InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
"β« Unmute", callback_data=f"arzunmuted_{user_id}" |
|
) |
|
] |
|
] |
|
) |
|
) |
|
|
|
@Client.on_message( |
|
~filters.scheduled |
|
& filters.command(["adminlist"]) |
|
& ~filters.forwarded |
|
) |
|
async def arz_admin_list(client: Client, message: Message): |
|
if message.chat.type == ChatType.PRIVATE: |
|
return await message.reply_text("This command is not available in private chats.") |
|
|
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await message.reply_text("I don't have enough permissions to list admins.") |
|
|
|
try: |
|
admin_listx = [] |
|
async for m in client.get_chat_members(message.chat.id, filter=ChatMembersFilter.ADMINISTRATORS): |
|
if m.user.is_bot: |
|
continue |
|
if m.user.is_deleted: |
|
continue |
|
user = m.user |
|
tag = f"[{user.first_name}](tg://user?id={user.id})" |
|
role = "π Owner" if m.status.value == "owner" else "π§ Admin" |
|
admin_listx.append(f"β£ {role}: {tag}") |
|
|
|
if not admin_list: |
|
return await message.reply_text("No admins found.") |
|
|
|
text = "**π₯ Admin List:**\n\n" + "\n".join(admin_listx) |
|
await message.reply_text( |
|
text, |
|
disable_web_page_preview=True |
|
) |
|
except Exception as e: |
|
await message.reply_text(f"Failed to fetch admin list: `{e}`") |
|
|
|
@Client.on_message(filters.incoming & filters.group, group=-1) |
|
async def check_user_mute(client: Client, message: Message): |
|
bot = await client.get_chat_member(message.chat.id, client.me.id) |
|
if not bot.privileges or not bot.privileges.can_restrict_members: |
|
return |
|
if message.from_user and message.from_user.is_deleted: |
|
return |
|
if message.from_user and message.from_user.is_bot: |
|
return |
|
if message.chat.type == ChatType.PRIVATE: |
|
return |
|
user = message.from_user.id |
|
muted_users = await db.alldl_bot.find_one({"bot_id": client.me.id}) |
|
if not muted_users or "mute_user" not in muted_users: |
|
return |
|
if muted_users and user in muted_users["mute_user"]: |
|
await client.restrict_chat_member( |
|
message.chat.id, |
|
user, |
|
permissions=ChatPermissions( |
|
can_send_messages=False, |
|
can_send_media_messages=False, |
|
can_send_polls=False, |
|
can_change_info=False, |
|
can_invite_users=False, |
|
can_pin_messages=False, |
|
), |
|
) |