|
import asyncio |
|
import contextlib |
|
import math |
|
import os |
|
import shlex |
|
import shutil |
|
import textwrap |
|
import time |
|
from io import BytesIO |
|
from typing import Tuple |
|
|
|
import cv2 |
|
import requests |
|
from bs4 import BeautifulSoup as bs |
|
from git import Repo |
|
from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError |
|
from PIL import Image, ImageDraw, ImageFont |
|
from pymediainfo import MediaInfo |
|
from pyrogram import * |
|
from pyrogram.enums import * |
|
from pyrogram.errors import * |
|
from pyrogram.raw.functions.messages import * |
|
from pyrogram.raw.types import * |
|
from pyrogram.types import * |
|
from pyrogram.types import Message |
|
|
|
from akn.utils.formatter import humanbytes, readable_time |
|
|
|
DEVS = [1191668125] |
|
|
|
def global_no_spam_title(message: Message): |
|
chat = message.chat |
|
if chat is not None and hasattr(chat, "title") and chat.title is not None: |
|
if ( |
|
"#nodevs" in chat.title.lower() |
|
and message.from_user.status |
|
not in [enums.ChatMemberStatus.ADMINISTRATOR, enums.ChatMemberStatus.OWNER] |
|
and message.from_user.id not in DEVS |
|
): |
|
return True |
|
|
|
async def add_text_img(image_path, text): |
|
font_size = 12 |
|
stroke_width = 1 |
|
|
|
if ";" in text: |
|
upper_text, lower_text = text.split(";") |
|
else: |
|
upper_text = text |
|
lower_text = "" |
|
|
|
img = Image.open(image_path).convert("RGBA") |
|
img_info = img.info |
|
image_width, image_height = img.size |
|
font = ImageFont.truetype( |
|
font="resources/fonts/default.ttf", |
|
size=int(image_height * font_size) // 100, |
|
) |
|
draw = ImageDraw.Draw(img) |
|
|
|
char_width, char_height = font.getsize("A") |
|
chars_per_line = image_width // char_width |
|
top_lines = textwrap.wrap(upper_text, width=chars_per_line) |
|
bottom_lines = textwrap.wrap(lower_text, width=chars_per_line) |
|
|
|
if top_lines: |
|
y = 10 |
|
for line in top_lines: |
|
line_width, line_height = font.getsize(line) |
|
x = (image_width - line_width) / 2 |
|
draw.text( |
|
(x, y), |
|
line, |
|
fill="white", |
|
font=font, |
|
stroke_width=stroke_width, |
|
stroke_fill="black", |
|
) |
|
y += line_height |
|
|
|
if bottom_lines: |
|
y = image_height - char_height * len(bottom_lines) - 15 |
|
for line in bottom_lines: |
|
line_width, line_height = font.getsize(line) |
|
x = (image_width - line_width) / 2 |
|
draw.text( |
|
(x, y), |
|
line, |
|
fill="white", |
|
font=font, |
|
stroke_width=stroke_width, |
|
stroke_fill="black", |
|
) |
|
y += line_height |
|
|
|
final_image = os.path.join("memify.webp") |
|
img.save(final_image, **img_info) |
|
return final_image |
|
|
|
|
|
|
|
async def bash(cmd): |
|
process = await asyncio.create_subprocess_shell( |
|
cmd, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE, |
|
) |
|
stdout, stderr = await process.communicate() |
|
err = stderr.decode().strip() |
|
out = stdout.decode().strip() |
|
return out, err |
|
|
|
async def resize_media(media: str, video: bool, fast_forward: bool) -> str: |
|
if video: |
|
info_ = Media_Info.data(media) |
|
width = info_["pixel_sizes"][0] |
|
height = info_["pixel_sizes"][1] |
|
sec = info_["duration_in_ms"] |
|
s = round(float(sec)) / 1000 |
|
|
|
if height == width: |
|
height, width = 512, 512 |
|
elif height > width: |
|
height, width = 512, -1 |
|
elif width > height: |
|
height, width = -1, 512 |
|
|
|
resized_video = f"{media}.webm" |
|
if fast_forward: |
|
if s > 3: |
|
fract_ = 3 / s |
|
ff_f = round(fract_, 2) |
|
set_pts_ = ff_f - 0.01 if ff_f > fract_ else ff_f |
|
cmd_f = f"-filter:v 'setpts={set_pts_}*PTS',scale={width}:{height}" |
|
else: |
|
cmd_f = f"-filter:v scale={width}:{height}" |
|
else: |
|
cmd_f = f"-filter:v scale={width}:{height}" |
|
fps_ = float(info_["frame_rate"]) |
|
fps_cmd = "-r 30 " if fps_ > 30 else "" |
|
cmd = f"ffmpeg -i {media} {cmd_f} -ss 00:00:00 -to 00:00:03 -an -c:v libvpx-vp9 {fps_cmd}-fs 256K {resized_video}" |
|
_, error, __, ___ = await run_cmd(cmd) |
|
os.remove(media) |
|
return resized_video |
|
|
|
image = Image.open(media) |
|
maxsize = 512 |
|
scale = maxsize / max(image.width, image.height) |
|
new_size = (int(image.width * scale), int(image.height * scale)) |
|
|
|
image = image.resize(new_size, Image.LANCZOS) |
|
resized_photo = "sticker.png" |
|
image.save(resized_photo) |
|
os.remove(media) |
|
return resized_photo |
|
|
|
def get_text(message: Message) -> [None, str]: |
|
"""Extract Text From Commands""" |
|
text_to_return = message.text |
|
if message.text is None: |
|
return None |
|
if " " in text_to_return: |
|
try: |
|
return message.text.split(None, 1)[1] |
|
except IndexError: |
|
return None |
|
else: |
|
return None |
|
|
|
def get_arg(message: Message): |
|
msg = message.text |
|
msg = msg.replace(" ", "", 1) if msg[1] == " " else msg |
|
split = msg[1:].replace("\n", " \n").split(" ") |
|
if " ".join(split[1:]).strip() == "": |
|
return "" |
|
return " ".join(split[1:]) |
|
|
|
def get_args(message: Message): |
|
try: |
|
message = message.text |
|
except AttributeError: |
|
pass |
|
if not message: |
|
return False |
|
message = message.split(maxsplit=1) |
|
if len(message) <= 1: |
|
return [] |
|
message = message[1] |
|
try: |
|
split = shlex.split(message) |
|
except ValueError: |
|
return message |
|
return list(filter(lambda x: len(x) > 0, split)) |
|
|
|
async def run_cmd(cmd: str) -> Tuple[str, str, int, int]: |
|
"""Run Commands""" |
|
args = shlex.split(cmd) |
|
process = await asyncio.create_subprocess_exec( |
|
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
|
) |
|
stdout, stderr = await process.communicate() |
|
return ( |
|
stdout.decode("utf-8", "replace").strip(), |
|
stderr.decode("utf-8", "replace").strip(), |
|
process.returncode, |
|
process.pid, |
|
) |
|
|
|
async def convert_to_image(message, client) -> [None, str]: |
|
"""Convert Most Media Formats To Raw Image""" |
|
if not message: |
|
return None |
|
if not message.reply_to_message: |
|
return None |
|
final_path = None |
|
if not ( |
|
message.reply_to_message.video |
|
or message.reply_to_message.photo |
|
or message.reply_to_message.sticker |
|
or message.reply_to_message.media |
|
or message.reply_to_message.animation |
|
or message.reply_to_message.audio |
|
): |
|
return None |
|
if message.reply_to_message.photo: |
|
final_path = await message.reply_to_message.download() |
|
elif message.reply_to_message.sticker: |
|
if message.reply_to_message.sticker.mime_type == "image/webp": |
|
final_path = "webp_to_png_s_proton.png" |
|
path_s = await message.reply_to_message.download() |
|
im = Image.open(path_s) |
|
im.save(final_path, "PNG") |
|
else: |
|
path_s = await client.download_media(message.reply_to_message) |
|
final_path = "lottie_proton.png" |
|
cmd = ( |
|
f"lottie_convert.py --frame 0 -if lottie -of png {path_s} {final_path}" |
|
) |
|
await run_cmd(cmd) |
|
elif message.reply_to_message.audio: |
|
thumb = message.reply_to_message.audio.thumbs[0].file_id |
|
final_path = await client.download_media(thumb) |
|
elif message.reply_to_message.video or message.reply_to_message.animation: |
|
final_path = "fetched_thumb.png" |
|
vid_path = await client.download_media(message.reply_to_message) |
|
await run_cmd(f"ffmpeg -i {vid_path} -filter:v scale=500:500 -an {final_path}") |
|
return final_path |
|
|
|
async def get_ub_chats( |
|
client: Client, |
|
chat_types: list = [ |
|
enums.ChatType.GROUP, |
|
enums.ChatType.SUPERGROUP, |
|
enums.ChatType.CHANNEL, |
|
], |
|
is_id_only=True, |
|
): |
|
ub_chats = [] |
|
async for dialog in client.get_dialogs(): |
|
if dialog.chat.type in chat_types: |
|
if is_id_only: |
|
ub_chats.append(dialog.chat.id) |
|
else: |
|
ub_chats.append(dialog.chat) |
|
else: |
|
continue |
|
return ub_chats |
|
|
|
def ReplyCheck(message: Message): |
|
reply_id = None |
|
|
|
if message.reply_to_message: |
|
reply_id = message.reply_to_message.id |
|
|
|
elif not message.from_user.is_self: |
|
reply_id = message.id |
|
|
|
return reply_id |
|
|
|
def SpeedConvert(size): |
|
power = 2**10 |
|
zero = 0 |
|
units = {0: "", 1: "Kbit/s", 2: "Mbit/s", 3: "Gbit/s", 4: "Tbit/s"} |
|
while size > power: |
|
size /= power |
|
zero += 1 |
|
return f"{round(size, 2)} {units[zero]}" |
|
|
|
def GetFromUserID(message: Message): |
|
return message.from_user.id |
|
|
|
def GetChatID(message: Message): |
|
return message.chat.id |
|
|
|
def GetUserMentionable(user: User): |
|
if user.username: |
|
username = "@{}".format(user.username) |
|
else: |
|
if user.last_name: |
|
name_string = "{} {}".format(user.first_name, user.last_name) |
|
else: |
|
name_string = "{}".format(user.first_name) |
|
|
|
username = "<a href='tg://user?id={}'>{}</a>".format(user.id, name_string) |
|
|
|
return username |
|
|
|
def resize_image(image): |
|
im = Image.open(image) |
|
maxsize = (512, 512) |
|
if (im.width and im.height) < 512: |
|
size1 = im.width |
|
size2 = im.height |
|
if im.width > im.height: |
|
scale = 512 / size1 |
|
size1new = 512 |
|
size2new = size2 * scale |
|
else: |
|
scale = 512 / size2 |
|
size1new = size1 * scale |
|
size2new = 512 |
|
size1new = math.floor(size1new) |
|
size2new = math.floor(size2new) |
|
sizenew = (size1new, size2new) |
|
im = im.resize(sizenew) |
|
else: |
|
im.thumbnail(maxsize) |
|
file_name = "Sticker.png" |
|
im.save(file_name, "PNG") |
|
if os.path.exists(image): |
|
os.remove(image) |
|
return file_name |
|
|
|
class Media_Info: |
|
def data(media: str) -> dict: |
|
"Get downloaded media's information" |
|
found = False |
|
media_info = MediaInfo.parse(media) |
|
for track in media_info.tracks: |
|
if track.track_type == "Video": |
|
found = True |
|
type_ = track.track_type |
|
format_ = track.format |
|
duration_1 = track.duration |
|
other_duration_ = track.other_duration |
|
duration_2 = ( |
|
f"{other_duration_[0]} - ({other_duration_[3]})" |
|
if other_duration_ |
|
else None |
|
) |
|
pixel_ratio_ = [track.width, track.height] |
|
aspect_ratio_1 = track.display_aspect_ratio |
|
other_aspect_ratio_ = track.other_display_aspect_ratio |
|
aspect_ratio_2 = other_aspect_ratio_[0] if other_aspect_ratio_ else None |
|
fps_ = track.frame_rate |
|
fc_ = track.frame_count |
|
media_size_1 = track.stream_size |
|
other_media_size_ = track.other_stream_size |
|
media_size_2 = ( |
|
[ |
|
other_media_size_[1], |
|
other_media_size_[2], |
|
other_media_size_[3], |
|
other_media_size_[4], |
|
] |
|
if other_media_size_ |
|
else None |
|
) |
|
|
|
dict_ = ( |
|
{ |
|
"media_type": type_, |
|
"format": format_, |
|
"duration_in_ms": duration_1, |
|
"duration": duration_2, |
|
"pixel_sizes": pixel_ratio_, |
|
"aspect_ratio_in_fraction": aspect_ratio_1, |
|
"aspect_ratio": aspect_ratio_2, |
|
"frame_rate": fps_, |
|
"frame_count": fc_, |
|
"file_size_in_bytes": media_size_1, |
|
"file_size": media_size_2, |
|
} |
|
if found |
|
else None |
|
) |
|
return dict_ |
|
|
|
async def get_files_from_directory(directory: str): |
|
all_files = [] |
|
for path, _, files in os.walk(directory): |
|
for file in files: |
|
all_files.append(os.path.join(path, file)) |
|
return all_files |
|
|
|
async def runcmd(cmd: str) -> Tuple[str, str, int, int]: |
|
args = shlex.split(cmd) |
|
process = await asyncio.create_subprocess_exec( |
|
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
|
) |
|
stdout, stderr = await process.communicate() |
|
return ( |
|
stdout.decode("utf-8", "replace").strip(), |
|
stderr.decode("utf-8", "replace").strip(), |
|
process.returncode, |
|
process.pid, |
|
) |
|
|
|
async def update_dotenv(key: str, value: str) -> None: |
|
with open(".env", "r") as file: |
|
data = file.readlines() |
|
for index, line in enumerate(data): |
|
if line.startswith(f"{key}="): |
|
data[index] = f"{key}={value}\n" |
|
break |
|
with open(".env", "w") as file: |
|
file.writelines(data) |
|
|
|
async def gen_changelogs(repo: Repo, branch: str) -> str: |
|
changelogs = "" |
|
commits = list(repo.iter_commits(branch))[:5] |
|
for index, commit in enumerate(commits): |
|
changelogs += f"**{index + 1}.** `{commit.summary}`\n" |
|
return changelogs |
|
|
|
async def initialize_git(git_repo: str): |
|
force = False |
|
try: |
|
repo = Repo() |
|
except NoSuchPathError as pathErr: |
|
repo.__del__() |
|
return False, pathErr, force |
|
except GitCommandError as gitErr: |
|
repo.__del__() |
|
return False, gitErr, force |
|
except InvalidGitRepositoryError: |
|
repo = Repo.init() |
|
origin = repo.create_remote("upstream", f"https://github.com/{git_repo}") |
|
origin.fetch() |
|
repo.create_head("master", origin.refs.master) |
|
repo.heads.master.set_tracking_branch(origin.refs.master) |
|
repo.heads.master.checkout(True) |
|
force = True |
|
with contextlib.suppress(BaseException): |
|
repo.create_remote("upstream", f"https://github.com/{git_repo}") |
|
return True, repo, force |
|
|