File size: 2,691 Bytes
21bc372 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import re
from typing import Union, List
from pyrogram import Client
from pyrogram.filters import Filter, create
from pyrogram.types import Message
from akn.utils.base_sqlite import get_prefix
from pyrogram.enums import MessageEntityType
def command(commands: Union[str, List[str]], case_sensitive: bool = False):
command_re = re.compile(
r"([\"'])(.*?)(?<!\\)\1|(\S+)",
flags=re.UNICODE,
)
async def func(flt, client: Client, message: Message):
username = client.me.username or ""
user_id = client.me.id if client.me else 0
text = message.text or message.caption
message.command = None
if not text:
return False
stored_prefix = await get_prefix(user_id)
if stored_prefix is None:
stored_prefix = "."
if stored_prefix == "None":
without_prefix = text.strip()
return await process_command(flt, client, message, without_prefix, command_re, username)
if message.entities:
for entity in message.entities:
if entity.type == MessageEntityType.CUSTOM_EMOJI and str(entity.custom_emoji_id) == stored_prefix:
without_prefix = text[entity.length:].strip()
return await process_command(flt, client, message, without_prefix, command_re, username)
if text.startswith(stored_prefix):
without_prefix = text[len(stored_prefix):].strip()
return await process_command(flt, client, message, without_prefix, command_re, username)
return False
commands = commands if isinstance(commands, list) else [commands]
commands = {c if case_sensitive else c.lower() for c in commands}
return create(func, "CommandFilter", commands=commands, case_sensitive=case_sensitive)
async def process_command(flt, client: Client, message: Message, without_prefix: str, command_re, username: str):
"""Helper function to process command after prefix (if any) is removed."""
for cmd in flt.commands:
if re.match(
f"^(?:{cmd}(?:@?{username})?)(?:\s|$)",
without_prefix,
flags=0 if flt.case_sensitive else re.IGNORECASE,
):
without_command = re.sub(
f"{cmd}(?:@?{username})?\s?",
"",
without_prefix,
count=1,
flags=0 if flt.case_sensitive else re.IGNORECASE,
)
message.command = [cmd] + [
re.sub(r"\\([\"'])", r"\1", m.group(2) or m.group(3) or "")
for m in command_re.finditer(without_command)
]
return True
return False |