randydev's picture
fix update
5eaaf71
raw
history blame
12.2 kB
from pyrogram import *
from pyrogram.types import *
from pyrogram.enums import *
from akn.utils.database import db
unmute_permissions = ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_polls=True,
can_change_info=False,
can_invite_users=True,
can_pin_messages=False,
)
async def extract_userid(message, text: str):
def is_int(text: str):
try:
int(text)
except ValueError:
return False
return True
text = text.strip()
if is_int(text):
return int(text)
entities = message.entities
app = message._client
if len(entities) < 2:
return (await app.get_users(text)).id
entity = entities[1]
if entity.type == "mention":
return (await app.get_users(text)).id
if entity.type == "text_mention":
return entity.user.id
return None
async def extract_user_and_reason(message, sender_chat=False):
args = message.text.strip().split()
text = message.text
user = None
reason = None
if message.reply_to_message:
reply = message.reply_to_message
if not reply.from_user:
if (
reply.sender_chat
and reply.sender_chat != message.chat.id
and sender_chat
):
id_ = reply.sender_chat.id
else:
return None, None
else:
id_ = reply.from_user.id
if len(args) < 2:
reason = None
else:
reason = text.split(None, 1)[1]
return id_, reason
if len(args) == 2:
user = text.split(None, 1)[1]
return await extract_userid(message, user), None
if len(args) > 2:
user, reason = text.split(None, 2)[1:]
return await extract_userid(message, user), reason
return user, reason
async def extract_user(message):
return (await extract_user_and_reason(message))[0]
@Client.on_message(
~filters.scheduled
& filters.command(["ban", "dban"])
& ~filters.forwarded
)
async def arz_ban_user(client: Client, message: Message):
user_id, reason = await extract_user_and_reason(message, sender_chat=True)
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot.can_restrict_members:
return await message.edit_text("I don't have enough permissions")
if not user_id:
return await message.edit_text("I can't find that user.")
if user_id == client.me.id:
return await message.edit_text("I can't ban myself.")
if user_id == 1191668125:
return await message.edit_text("I can't ban my developer!")
try:
mention = (await client.get_users(user_id)).mention
except IndexError:
mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = (
f"**Banned User:** <a href='tg://user?id={user_id}'>{mention}</a>\n"
f"**Banned By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n"
)
if message.command[0][0] == "d":
await message.reply_to_message.delete()
if reason:
msg += f"**Reason:** {reason}"
await message.chat.ban_member(user_id)
await message.reply_text(
msg,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"Unban", callback_data=f"arzunban_{user_id}"
)
]
]
)
)
@Client.on_message(
~filters.scheduled
& filters.command(["unban"])
& ~filters.forwarded
)
async def arz_unban_user(client: Client, message: Message):
user_id, reason = await extract_user_and_reason(message, sender_chat=True)
if not user_id:
return await message.edit_text("I can't find that user.")
if user_id == client.me.id:
return await message.edit_text("I can't unban myself.")
if user_id == 1191668125:
return await message.edit_text("I can't unban my developer!")
try:
mention = (await client.get_users(user_id)).mention
except IndexError:
mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = (
f"**Unbanned User:** <a href='tg://user?id={user_id}'>{mention}</a>\n"
f"**Unbanned By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n"
)
if reason:
msg += f"**Reason:** {reason}"
await message.chat.unban_member(user_id)
await message.reply_text(msg)
@Client.on_message(
~filters.scheduled
& filters.command(["banme"])
& ~filters.forwarded
)
async def arz_ban_me(client: Client, message: Message):
user_id = message.from_user.id
reason = None
if not user_id:
return await message.edit_text("I can't find that user.")
if user_id == client.me.id:
return await message.edit_text("I can't ban myself.")
if user_id == 1191668125:
return await message.edit_text("I can't ban my developer!")
try:
mention = (await client.get_users(user_id)).mention
except IndexError:
mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = (
f"**Banned User:** <a href='tg://user?id={user_id}'>{mention}</a>\n"
f"**Banned By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n"
)
if reason:
msg += f"**Reason:** {reason}"
await message.chat.ban_member(user_id)
await db.alldl_bot.update_one(
{"bot_id": client.me.id},
{"$set": {"ban_user": user_id}},
upsert=True,
)
await message.reply_text(
msg,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"Unban", callback_data=f"arzunban_{user_id}"
)
]
]
)
)
@Client.on_message(
~filters.scheduled
& filters.command(["mute", "dmute"])
& ~filters.forwarded
)
async def arz_mute(client: Client, message: Message):
user_id, reason = await extract_user_and_reason(message, sender_chat=True)
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot.can_restrict_members:
return await message.edit_text("I don't have enough permissions")
if not user_id:
return await message.edit_text("I can't find that user.")
if user_id == client.me.id:
return await message.edit_text("I can't mute myself.")
if user_id == 1191668125:
return await message.edit_text("I can't mute my developer!")
try:
mention = (await client.get_users(user_id)).mention
except IndexError:
mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = (
f"**Muted User:** <a href='tg://user?id={user_id}'>{mention}</a>\n"
f"**Muted By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n"
)
if message.command[0][0] == "d":
await message.reply_to_message.delete()
if reason:
msg += f"**Reason:** {reason}"
await message.chat.restrict_member(user_id, ChatPermissions())
await db.alldl_bot.update_one(
{"bot_id": client.me.id},
{"$addToSet": {"mute_user": user_id}},
upsert=True,
)
await message.reply_text(
msg,
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"Unmute", callback_data=f"arzunmute_{user_id}"
)
]
]
)
)
@Client.on_message(
~filters.scheduled
& filters.command(["unmute"])
& ~filters.forwarded
)
async def arz_unmute(client: Client, message: Message):
user_id, reason = await extract_user_and_reason(message, sender_chat=True)
if not user_id:
return await message.edit_text("I can't find that user.")
if user_id == client.me.id:
return await message.edit_text("I can't unmute myself.")
if user_id == 1191668125:
return await message.edit_text("I can't unmute my developer!")
try:
mention = (await client.get_users(user_id)).mention
except IndexError:
mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = (
f"**Unmuted User:** <a href='tg://user?id={user_id}'>{mention}</a>\n"
f"**Unmuted By:** [{message.from_user.mention}](tg://user?id={message.from_user.id if message.from_user else 0})\n"
)
if reason:
msg += f"**Reason:** {reason}"
await message.chat.unban_member(user_id)
await db.alldl_bot.update_one(
{"bot_id": client.me.id},
{"$pull": {"mute_user": user_id}},
upsert=True,
)
await message.reply_text(msg)
@Client.on_message(
~filters.scheduled
& filters.command(["muteall"])
& ~filters.forwarded
)
async def arz_mute_all(client: Client, message: Message):
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot.can_restrict_members:
return await message.edit_text("I don't have enough permissions")
if message.chat.type == ChatType.PRIVATE:
return await message.edit_text("This command is not available in private chats.")
if message.chat.type == ChatType.SUPERGROUP:
try:
await client.restrict_chat_member(
message.chat.id,
client.me.id,
permissions=ChatPermissions(),
)
await message.reply_text("All members muted successfully.")
except Exception as e:
await message.reply_text(f"Failed to mute all members: {e}")
@Client.on_message(
~filters.scheduled
& filters.command(["unmuteall"])
& ~filters.forwarded
)
async def arz_unmute_all(client: Client, message: Message):
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot.can_restrict_members:
return await message.edit_text("I don't have enough permissions")
if message.chat.type == ChatType.PRIVATE:
return await message.edit_text("This command is not available in private chats.")
if message.chat.type == ChatType.SUPERGROUP:
try:
await client.restrict_chat_member(
message.chat.id,
client.me.id,
permissions=unmute_permissions,
)
await message.reply_text("All members unmuted successfully.")
except Exception as e:
await message.reply_text(f"Failed to unmute all members: {e}")
@Client.on_message(filters.incoming & filters.group, group=-1)
async def check_user_mute(client: Client, message: Message):
if message.from_user and message.from_user.is_bot:
return
if message.chat.type == ChatType.PRIVATE:
return
user = message.from_user.id
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot.can_restrict_members:
return
muted_users = await db.alldl_bot.find_one({"bot_id": client.me.id})
if not muted_users or "mute_user" not in muted_users:
return
if muted_users and user in muted_users["mute_user"]:
await client.restrict_chat_member(
message.chat.id,
user,
permissions=ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_polls=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
),
)