akn-dev / akn /Akeno /admin.py
randydev's picture
Fix permission admins
9e1a793 verified
# Copyright (C) 2020-2023 TeamKillerX <https://github.com/TeamKillerX>
#
# This file is part of TeamKillerX project,
# and licensed under GNU Affero General Public License v3.
# See the GNU Affero General Public License for more details.
#
# All rights reserved. See COPYING, AUTHORS.
#
import asyncio
import time
from time import time as waktu
from pyrogram import Client
from pyrogram import Client as ren
from pyrogram import *
from pyrogram import filters
from pyrogram.errors import *
from pyrogram.types import *
from akn.utils.handler import *
from akn.Akeno.help import add_command_help
from akn.utils.logger import LOGS
from akn.utils.prefixprem import command
from config import *
admins_in_chat = {}
unmute_permissions = ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_polls=True,
can_change_info=False,
can_invite_users=True,
can_pin_messages=False,
)
async def extract_userid(message, text: str):
def is_int(text: str):
try:
int(text)
except ValueError:
return False
return True
text = text.strip()
if is_int(text):
return int(text)
entities = message.entities
app = message._client
if len(entities) < 2:
return (await app.get_users(text)).id
entity = entities[1]
if entity.type == "mention":
return (await app.get_users(text)).id
if entity.type == "text_mention":
return entity.user.id
return None
async def extract_user_and_reason(message, sender_chat=False):
args = message.text.strip().split()
text = message.text
user = None
reason = None
if message.reply_to_message:
reply = message.reply_to_message
if not reply.from_user:
if (
reply.sender_chat
and reply.sender_chat != message.chat.id
and sender_chat
):
id_ = reply.sender_chat.id
else:
return None, None
else:
id_ = reply.from_user.id
if len(args) < 2:
reason = None
else:
reason = text.split(None, 1)[1]
return id_, reason
if len(args) == 2:
user = text.split(None, 1)[1]
return await extract_userid(message, user), None
if len(args) > 2:
user, reason = text.split(None, 2)[1:]
return await extract_userid(message, user), reason
return user, reason
async def extract_user(message):
return (await extract_user_and_reason(message))[0]
async def list_admins(client: Client, chat_id: int):
global admins_in_chat
if chat_id in admins_in_chat:
interval = time() - admins_in_chat[chat_id]["last_updated_at"]
if interval < 3600:
return admins_in_chat[chat_id]["data"]
admins_in_chat[chat_id] = {
"last_updated_at": waktu(),
"data": [
member.user.id
async for member in client.get_chat_members(
chat_id, filter=enums.ChatMembersFilter.ADMINISTRATORS
)
],
}
return admins_in_chat[chat_id]["data"]
@Akeno(
~filters.scheduled & command(["ban", "dban", "hban"]) & filters.me & ~filters.forwarded
)
async def member_ban_user(client: Client, message: Message):
user_id, reason = await extract_user_and_reason(message, sender_chat=True)
rd = await message.edit_text("`Processing...`")
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot or not bot.can_restrict_members:
return await rd.edit_text("I don't have enough permissions")
if not user_id:
return await rd.edit_text("I can't find that user.")
if user_id == client.me.id:
return await rd.edit_text("I can't ban myself.")
if user_id == 1191668125:
return await rd.edit_text("I can't ban my developer!")
if user_id in (await list_admins(client, message.chat.id)):
return await rd.edit_text("I can't ban an admin, You know the rules, so do i.")
try:
mention = (await client.get_users(user_id)).mention
except IndexError:
mention = (
message.reply_to_message.sender_chat.title
if message.reply_to_message
else "Anon"
)
msg = (
f"**Banned User:** {mention}\n"
f"**Banned By:** {message.from_user.mention if message.from_user else 'Anon'}\n"
)
if message.command[0][0] == "d":
await message.reply_to_message.delete()
elif message.command[0][0] == "h":
await client.delete_user_history(message.chat.id, user_id)
if reason:
msg += f"**Reason:** {reason}"
await message.chat.ban_member(user_id)
await rd.edit_text(msg)
@Akeno(
~filters.scheduled & command(["unban"]) & filters.me & ~filters.forwarded
)
async def member_unban_user(client: Client, message: Message):
reply = message.reply_to_message
rd = await message.edit_text("`Processing...`")
if not bot or not bot.can_restrict_members:
return await rd.edit_text("I don't have enough permissions")
if reply and reply.sender_chat and reply.sender_chat != message.chat.id:
return await rd.edit_text("You cannot unban a channel")
if len(message.command) == 2:
user = message.text.split(None, 1)[1]
elif len(message.command) == 1 and reply:
user = message.reply_to_message.from_user.id
else:
return await rd.edit_text(
"Provide a username or reply to a user's message to unban."
)
await message.chat.unban_member(user)
umention = (await client.get_users(user)).mention
await rd.edit_text(f"Unbanned! {umention}")
@Akeno(
~filters.scheduled & command(["pin", "unpin"]) & filters.me & ~filters.forwarded
)
async def pin_message(client: Client, message: Message):
if not message.reply_to_message:
return await message.edit_text("Reply to a message to pin/unpin it.")
rd = await message.edit_text("`Processing...`")
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot or not bot.can_pin_messages:
return await rd.edit_text("I don't have enough permissions")
r = message.reply_to_message
if message.command[0][0] == "u":
await r.unpin()
return await rd.edit_text(
f"**Unpinned [this]({r.link}) message.**",
disable_web_page_preview=True,
)
await r.pin(disable_notification=True)
await rd.edit_text(
f"**Pinned [this]({r.link}) message.**",
disable_web_page_preview=True,
)
@Akeno(
~filters.scheduled & command(["mute", "dmute", "hmute"]) & filters.me & ~filters.forwarded
)
async def mute_user(client: Client, message: Message):
user_id, reason = await extract_user_and_reason(message)
rd = await message.edit_text("`Processing...`")
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot or not bot.can_restrict_members:
return await rd.edit_text("I don't have enough permissions")
if not user_id:
return await rd.edit_text("I can't find that user.")
if user_id == client.me.id:
return await rd.edit_text("I can't mute myself.")
if user_id == 1191668125:
return await rd.edit("I can't mute my developer!")
if user_id in (await list_admins(client, message.chat.id)):
return await rd.edit_text("I can't mute an admin, You know the rules, so do i.")
mention = (await client.get_users(user_id)).mention
msg = (
f"**Muted User:** {mention}\n"
f"**Muted By:** {message.from_user.mention if message.from_user else 'Anon'}\n"
)
if message.command[0][0] == "d":
await message.reply_to_message.delete()
elif message.command[0][0] == "h":
await client.delete_user_history(message.chat.id, user_id)
if reason:
msg += f"**Reason:** {reason}"
await message.chat.restrict_member(user_id, permissions=ChatPermissions())
await rd.edit_text(msg)
@Akeno(
~filters.scheduled & command(["unmute"]) & filters.me & ~filters.forwarded
)
async def unmute_user(client: Client, message: Message):
user_id = await extract_user(message)
rd = await message.edit_text("`Processing...`")
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot or not bot.can_restrict_members:
return await rd.edit_text("I don't have enough permissions")
if not user_id:
return await rd.edit_text("I can't find that user.")
await message.chat.restrict_member(user_id, permissions=unmute_permissions)
umention = (await client.get_users(user_id)).mention
await rd.edit_text(f"Unmuted! {umention}")
@Akeno(
~filters.scheduled & command(["kick", "dkick", "hkick"]) & filters.me & ~filters.forwarded
)
async def kick_user(client: Client, message: Message):
user_id, reason = await extract_user_and_reason(message)
rd = await message.edit_text("`Processing...`")
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot or not bot.can_restrict_members:
return await rd.edit_text("I don't have enough permissions")
if not user_id:
return await rd.edit_text("I can't find that user.")
if user_id == client.me.id:
return await rd.edit_text("I can't kick myself.")
if user_id == 1191668125:
return await rd.edit_text("I can't kick my developer.")
if user_id in (await list_admins(client, message.chat.id)):
return await rd.edit_text("I can't kick an admin, You know the rules, so do i.")
mention = (await client.get_users(user_id)).mention
msg = f"""
**Kicked User:** {mention}
**Kicked By:** {message.from_user.mention if message.from_user else 'Anon'}"""
if message.command[0][0] == "d":
await message.reply_to_message.delete()
elif message.command[0][0] == "h":
await client.delete_user_history(message.chat.id, user_id)
if reason:
msg += f"\n**Reason:** `{reason}`"
try:
await message.chat.ban_member(user_id)
await rd.edit_text(msg)
await asyncio.sleep(1)
await message.chat.unban_member(user_id)
except ChatAdminRequired:
return await rd.edit_text("**Maaf Anda Bukan admin**")
@Akeno(
~filters.scheduled & command(["promote", "fullpromote"]) & filters.me & ~filters.forwarded
)
async def promotte_user(client: Client, message: Message):
user_id = await extract_user(message)
umention = (await client.get_users(user_id)).mention
rd = await message.edit_text("`Processing...`")
if not user_id:
return await rd.edit_text("I can't find that user.")
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot or not bot.can_promote_members:
return await rd.edit_text("I don't have enough permissions")
if message.command[0][0] == "f":
await message.chat.promote_member(
user_id,
privileges=ChatPrivileges(
can_manage_chat=True,
can_delete_messages=True,
can_manage_video_chats=True,
can_restrict_members=True,
can_change_info=True,
can_invite_users=True,
can_pin_messages=True,
can_promote_members=True,
),
)
return await rd.edit_text(f"Fully Promoted! {umention}")
await message.chat.promote_member(
user_id,
privileges=ChatPrivileges(
can_manage_chat=True,
can_delete_messages=True,
can_manage_video_chats=True,
can_restrict_members=True,
can_change_info=True,
can_invite_users=True,
can_pin_messages=True,
can_promote_members=False,
),
)
await rd.edit_text(f"Promoted! {umention}")
@Akeno(
~filters.scheduled & command(["demote"]) & filters.me & ~filters.forwarded
)
async def demote_user(client: Client, message: Message):
user_id = await extract_user(message)
rd = await message.edit_text("`Processing...`")
if not user_id:
return await rd.edit_text("I can't find that user.")
if user_id == client.me.id:
return await rd.edit_text("I can't demote myself.")
bot = (await client.get_chat_member(message.chat.id, client.me.id)).privileges
if not bot or not bot.can_promote_members:
return await rd.edit_text("I don't have enough permissions")
await message.chat.promote_member(
user_id,
privileges=ChatPrivileges(
can_manage_chat=False,
can_delete_messages=False,
can_manage_video_chats=False,
can_restrict_members=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
can_promote_members=False,
),
)
umention = (await client.get_users(user_id)).mention
await rd.edit_text(f"Demoted! {umention}")
add_command_help(
"admin",
[
["ban [reply/username/userid]", "Ban someone."],
["dban [reply]", "dban a user deleting the replied to message."],
["unban [reply/username/userid]", "Unban someone."],
["kick [reply/username/userid]", "kick out someone from your group."],
["dkick [reply]", "dkick a user deleting the replied to message."],
["promote `or` .fullpromote", "Promote someonen."],
["demote", "Demote someone."],
["mute [reply/username/userid]", "Mute someone."],
["dmute [reply]", "dmute a user deleting the replied to message."],
["unmute [reply/username/userid]", "Unmute someone."],
["pin [reply]", "to pin any message."],
["unpin [reply]", "To unpin any message."],
["setgpic [reply ke image]", "To set an group profile pic."],
],
)
module = modules_help.add_module("admin", __file__)
module.add_command("ban", "Ban someone.")
module.add_command("dban", "dban a user deleting the replied to message")
module.add_command("hban", "hban a user deleting all the replied to message")
module.add_command("kick", "kick out someone from your group")
module.add_command("dkick", "dkick a user deleting the replied to message")
module.add_command("hkick", "hkick a user deleting all the replied to message")
module.add_command("promote", "Promote someonen")
module.add_command("demote", "Demote someone")
module.add_command("mute", "Mute someone")
module.add_command("dmute", "dmute a user deleting the replied to message")
module.add_command("hmute", "dmute a user deleting all the replied to message")
module.add_command("pin", "to pin any message.")
module.add_command("unpin", "To unpin any message")
module.add_command("setgpic", "To set an group profile pic.")