|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
import time |
|
from time import time as waktu |
|
|
|
from pyrogram import Client |
|
from pyrogram import Client as ren |
|
from pyrogram import * |
|
from pyrogram import filters |
|
from pyrogram.errors import * |
|
from pyrogram.types import * |
|
|
|
from akn.utils.handler import * |
|
from akn.Akeno.help import add_command_help |
|
from akn.utils.logger import LOGS |
|
from akn.utils.prefixprem import command |
|
from config import * |
|
|
|
admins_in_chat = {} |
|
|
|
unmute_permissions = ChatPermissions( |
|
can_send_messages=True, |
|
can_send_media_messages=True, |
|
can_send_polls=True, |
|
can_change_info=False, |
|
can_invite_users=True, |
|
can_pin_messages=False, |
|
) |
|
|
|
|
|
async def extract_userid(message, text: str): |
|
def is_int(text: str): |
|
try: |
|
int(text) |
|
except ValueError: |
|
return False |
|
return True |
|
|
|
text = text.strip() |
|
|
|
if is_int(text): |
|
return int(text) |
|
|
|
entities = message.entities |
|
app = message._client |
|
if len(entities) < 2: |
|
return (await app.get_users(text)).id |
|
entity = entities[1] |
|
if entity.type == "mention": |
|
return (await app.get_users(text)).id |
|
if entity.type == "text_mention": |
|
return entity.user.id |
|
return None |
|
|
|
|
|
async def extract_user_and_reason(message, sender_chat=False): |
|
args = message.text.strip().split() |
|
text = message.text |
|
user = None |
|
reason = None |
|
if message.reply_to_message: |
|
reply = message.reply_to_message |
|
if not reply.from_user: |
|
if ( |
|
reply.sender_chat |
|
and reply.sender_chat != message.chat.id |
|
and sender_chat |
|
): |
|
id_ = reply.sender_chat.id |
|
else: |
|
return None, None |
|
else: |
|
id_ = reply.from_user.id |
|
|
|
if len(args) < 2: |
|
reason = None |
|
else: |
|
reason = text.split(None, 1)[1] |
|
return id_, reason |
|
|
|
if len(args) == 2: |
|
user = text.split(None, 1)[1] |
|
return await extract_userid(message, user), None |
|
|
|
if len(args) > 2: |
|
user, reason = text.split(None, 2)[1:] |
|
return await extract_userid(message, user), reason |
|
|
|
return user, reason |
|
|
|
|
|
async def extract_user(message): |
|
return (await extract_user_and_reason(message))[0] |
|
|
|
|
|
async def list_admins(client: Client, chat_id: int): |
|
global admins_in_chat |
|
if chat_id in admins_in_chat: |
|
interval = time() - admins_in_chat[chat_id]["last_updated_at"] |
|
if interval < 3600: |
|
return admins_in_chat[chat_id]["data"] |
|
|
|
admins_in_chat[chat_id] = { |
|
"last_updated_at": waktu(), |
|
"data": [ |
|
member.user.id |
|
async for member in client.get_chat_members( |
|
chat_id, filter=enums.ChatMembersFilter.ADMINISTRATORS |
|
) |
|
], |
|
} |
|
return admins_in_chat[chat_id]["data"] |
|
|
|
|
|
@Akeno( |
|
~filters.scheduled & command(["ban", "dban", "hban"]) & filters.me & ~filters.forwarded |
|
) |
|
async def member_ban_user(client: Client, message: Message): |
|
user_id, reason = await extract_user_and_reason(message, sender_chat=True) |
|
rd = await message.edit_text("`Processing...`") |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await rd.edit_text("I don't have enough permissions") |
|
if not user_id: |
|
return await rd.edit_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await rd.edit_text("I can't ban myself.") |
|
if user_id == 1191668125: |
|
return await rd.edit_text("I can't ban my developer!") |
|
if user_id in (await list_admins(client, message.chat.id)): |
|
return await rd.edit_text("I can't ban an admin, You know the rules, so do i.") |
|
try: |
|
mention = (await client.get_users(user_id)).mention |
|
except IndexError: |
|
mention = ( |
|
message.reply_to_message.sender_chat.title |
|
if message.reply_to_message |
|
else "Anon" |
|
) |
|
msg = ( |
|
f"**Banned User:** {mention}\n" |
|
f"**Banned By:** {message.from_user.mention if message.from_user else 'Anon'}\n" |
|
) |
|
if message.command[0][0] == "d": |
|
await message.reply_to_message.delete() |
|
elif message.command[0][0] == "h": |
|
await client.delete_user_history(message.chat.id, user_id) |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.ban_member(user_id) |
|
await rd.edit_text(msg) |
|
|
|
|
|
@Akeno( |
|
~filters.scheduled & command(["unban"]) & filters.me & ~filters.forwarded |
|
) |
|
async def member_unban_user(client: Client, message: Message): |
|
reply = message.reply_to_message |
|
rd = await message.edit_text("`Processing...`") |
|
if not bot or not bot.can_restrict_members: |
|
return await rd.edit_text("I don't have enough permissions") |
|
if reply and reply.sender_chat and reply.sender_chat != message.chat.id: |
|
return await rd.edit_text("You cannot unban a channel") |
|
|
|
if len(message.command) == 2: |
|
user = message.text.split(None, 1)[1] |
|
elif len(message.command) == 1 and reply: |
|
user = message.reply_to_message.from_user.id |
|
else: |
|
return await rd.edit_text( |
|
"Provide a username or reply to a user's message to unban." |
|
) |
|
await message.chat.unban_member(user) |
|
umention = (await client.get_users(user)).mention |
|
await rd.edit_text(f"Unbanned! {umention}") |
|
|
|
|
|
@Akeno( |
|
~filters.scheduled & command(["pin", "unpin"]) & filters.me & ~filters.forwarded |
|
) |
|
async def pin_message(client: Client, message: Message): |
|
if not message.reply_to_message: |
|
return await message.edit_text("Reply to a message to pin/unpin it.") |
|
rd = await message.edit_text("`Processing...`") |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_pin_messages: |
|
return await rd.edit_text("I don't have enough permissions") |
|
r = message.reply_to_message |
|
if message.command[0][0] == "u": |
|
await r.unpin() |
|
return await rd.edit_text( |
|
f"**Unpinned [this]({r.link}) message.**", |
|
disable_web_page_preview=True, |
|
) |
|
await r.pin(disable_notification=True) |
|
await rd.edit_text( |
|
f"**Pinned [this]({r.link}) message.**", |
|
disable_web_page_preview=True, |
|
) |
|
|
|
@Akeno( |
|
~filters.scheduled & command(["mute", "dmute", "hmute"]) & filters.me & ~filters.forwarded |
|
) |
|
async def mute_user(client: Client, message: Message): |
|
user_id, reason = await extract_user_and_reason(message) |
|
rd = await message.edit_text("`Processing...`") |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await rd.edit_text("I don't have enough permissions") |
|
if not user_id: |
|
return await rd.edit_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await rd.edit_text("I can't mute myself.") |
|
if user_id == 1191668125: |
|
return await rd.edit("I can't mute my developer!") |
|
if user_id in (await list_admins(client, message.chat.id)): |
|
return await rd.edit_text("I can't mute an admin, You know the rules, so do i.") |
|
mention = (await client.get_users(user_id)).mention |
|
msg = ( |
|
f"**Muted User:** {mention}\n" |
|
f"**Muted By:** {message.from_user.mention if message.from_user else 'Anon'}\n" |
|
) |
|
if message.command[0][0] == "d": |
|
await message.reply_to_message.delete() |
|
elif message.command[0][0] == "h": |
|
await client.delete_user_history(message.chat.id, user_id) |
|
if reason: |
|
msg += f"**Reason:** {reason}" |
|
await message.chat.restrict_member(user_id, permissions=ChatPermissions()) |
|
await rd.edit_text(msg) |
|
|
|
@Akeno( |
|
~filters.scheduled & command(["unmute"]) & filters.me & ~filters.forwarded |
|
) |
|
async def unmute_user(client: Client, message: Message): |
|
user_id = await extract_user(message) |
|
rd = await message.edit_text("`Processing...`") |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await rd.edit_text("I don't have enough permissions") |
|
if not user_id: |
|
return await rd.edit_text("I can't find that user.") |
|
await message.chat.restrict_member(user_id, permissions=unmute_permissions) |
|
umention = (await client.get_users(user_id)).mention |
|
await rd.edit_text(f"Unmuted! {umention}") |
|
|
|
@Akeno( |
|
~filters.scheduled & command(["kick", "dkick", "hkick"]) & filters.me & ~filters.forwarded |
|
) |
|
async def kick_user(client: Client, message: Message): |
|
user_id, reason = await extract_user_and_reason(message) |
|
rd = await message.edit_text("`Processing...`") |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_restrict_members: |
|
return await rd.edit_text("I don't have enough permissions") |
|
if not user_id: |
|
return await rd.edit_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await rd.edit_text("I can't kick myself.") |
|
if user_id == 1191668125: |
|
return await rd.edit_text("I can't kick my developer.") |
|
if user_id in (await list_admins(client, message.chat.id)): |
|
return await rd.edit_text("I can't kick an admin, You know the rules, so do i.") |
|
mention = (await client.get_users(user_id)).mention |
|
msg = f""" |
|
**Kicked User:** {mention} |
|
**Kicked By:** {message.from_user.mention if message.from_user else 'Anon'}""" |
|
if message.command[0][0] == "d": |
|
await message.reply_to_message.delete() |
|
elif message.command[0][0] == "h": |
|
await client.delete_user_history(message.chat.id, user_id) |
|
if reason: |
|
msg += f"\n**Reason:** `{reason}`" |
|
try: |
|
await message.chat.ban_member(user_id) |
|
await rd.edit_text(msg) |
|
await asyncio.sleep(1) |
|
await message.chat.unban_member(user_id) |
|
except ChatAdminRequired: |
|
return await rd.edit_text("**Maaf Anda Bukan admin**") |
|
|
|
@Akeno( |
|
~filters.scheduled & command(["promote", "fullpromote"]) & filters.me & ~filters.forwarded |
|
) |
|
async def promotte_user(client: Client, message: Message): |
|
user_id = await extract_user(message) |
|
umention = (await client.get_users(user_id)).mention |
|
rd = await message.edit_text("`Processing...`") |
|
if not user_id: |
|
return await rd.edit_text("I can't find that user.") |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_promote_members: |
|
return await rd.edit_text("I don't have enough permissions") |
|
if message.command[0][0] == "f": |
|
await message.chat.promote_member( |
|
user_id, |
|
privileges=ChatPrivileges( |
|
can_manage_chat=True, |
|
can_delete_messages=True, |
|
can_manage_video_chats=True, |
|
can_restrict_members=True, |
|
can_change_info=True, |
|
can_invite_users=True, |
|
can_pin_messages=True, |
|
can_promote_members=True, |
|
), |
|
) |
|
return await rd.edit_text(f"Fully Promoted! {umention}") |
|
|
|
await message.chat.promote_member( |
|
user_id, |
|
privileges=ChatPrivileges( |
|
can_manage_chat=True, |
|
can_delete_messages=True, |
|
can_manage_video_chats=True, |
|
can_restrict_members=True, |
|
can_change_info=True, |
|
can_invite_users=True, |
|
can_pin_messages=True, |
|
can_promote_members=False, |
|
), |
|
) |
|
await rd.edit_text(f"Promoted! {umention}") |
|
|
|
@Akeno( |
|
~filters.scheduled & command(["demote"]) & filters.me & ~filters.forwarded |
|
) |
|
async def demote_user(client: Client, message: Message): |
|
user_id = await extract_user(message) |
|
rd = await message.edit_text("`Processing...`") |
|
if not user_id: |
|
return await rd.edit_text("I can't find that user.") |
|
if user_id == client.me.id: |
|
return await rd.edit_text("I can't demote myself.") |
|
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges |
|
if not bot or not bot.can_promote_members: |
|
return await rd.edit_text("I don't have enough permissions") |
|
await message.chat.promote_member( |
|
user_id, |
|
privileges=ChatPrivileges( |
|
can_manage_chat=False, |
|
can_delete_messages=False, |
|
can_manage_video_chats=False, |
|
can_restrict_members=False, |
|
can_change_info=False, |
|
can_invite_users=False, |
|
can_pin_messages=False, |
|
can_promote_members=False, |
|
), |
|
) |
|
umention = (await client.get_users(user_id)).mention |
|
await rd.edit_text(f"Demoted! {umention}") |
|
|
|
add_command_help( |
|
"admin", |
|
[ |
|
["ban [reply/username/userid]", "Ban someone."], |
|
["dban [reply]", "dban a user deleting the replied to message."], |
|
["unban [reply/username/userid]", "Unban someone."], |
|
["kick [reply/username/userid]", "kick out someone from your group."], |
|
["dkick [reply]", "dkick a user deleting the replied to message."], |
|
["promote `or` .fullpromote", "Promote someonen."], |
|
["demote", "Demote someone."], |
|
["mute [reply/username/userid]", "Mute someone."], |
|
["dmute [reply]", "dmute a user deleting the replied to message."], |
|
["unmute [reply/username/userid]", "Unmute someone."], |
|
["pin [reply]", "to pin any message."], |
|
["unpin [reply]", "To unpin any message."], |
|
["setgpic [reply ke image]", "To set an group profile pic."], |
|
], |
|
) |
|
module = modules_help.add_module("admin", __file__) |
|
module.add_command("ban", "Ban someone.") |
|
module.add_command("dban", "dban a user deleting the replied to message") |
|
module.add_command("hban", "hban a user deleting all the replied to message") |
|
module.add_command("kick", "kick out someone from your group") |
|
module.add_command("dkick", "dkick a user deleting the replied to message") |
|
module.add_command("hkick", "hkick a user deleting all the replied to message") |
|
module.add_command("promote", "Promote someonen") |
|
module.add_command("demote", "Demote someone") |
|
module.add_command("mute", "Mute someone") |
|
module.add_command("dmute", "dmute a user deleting the replied to message") |
|
module.add_command("hmute", "dmute a user deleting all the replied to message") |
|
module.add_command("pin", "to pin any message.") |
|
module.add_command("unpin", "To unpin any message") |
|
module.add_command("setgpic", "To set an group profile pic.") |
|
|