import asyncio import contextlib import math import os import shlex import shutil import textwrap import time from io import BytesIO from typing import Tuple import cv2 import requests from bs4 import BeautifulSoup as bs from git import Repo from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError from PIL import Image, ImageDraw, ImageFont from pymediainfo import MediaInfo from pyrogram import * from pyrogram.enums import * from pyrogram.errors import * from pyrogram.raw.functions.messages import * from pyrogram.raw.types import * from pyrogram.types import * from pyrogram.types import Message from akn.utils.formatter import humanbytes, readable_time DEVS = [1191668125] def global_no_spam_title(message: Message): chat = message.chat if chat is not None and hasattr(chat, "title") and chat.title is not None: if ( "#nodevs" in chat.title.lower() and message.from_user.status not in [enums.ChatMemberStatus.ADMINISTRATOR, enums.ChatMemberStatus.OWNER] and message.from_user.id not in DEVS ): return True async def add_text_img(image_path, text): font_size = 12 stroke_width = 1 if ";" in text: upper_text, lower_text = text.split(";") else: upper_text = text lower_text = "" img = Image.open(image_path).convert("RGBA") img_info = img.info image_width, image_height = img.size font = ImageFont.truetype( font="resources/fonts/default.ttf", size=int(image_height * font_size) // 100, ) draw = ImageDraw.Draw(img) char_width, char_height = font.getsize("A") chars_per_line = image_width // char_width top_lines = textwrap.wrap(upper_text, width=chars_per_line) bottom_lines = textwrap.wrap(lower_text, width=chars_per_line) if top_lines: y = 10 for line in top_lines: line_width, line_height = font.getsize(line) x = (image_width - line_width) / 2 draw.text( (x, y), line, fill="white", font=font, stroke_width=stroke_width, stroke_fill="black", ) y += line_height if bottom_lines: y = image_height - char_height * len(bottom_lines) - 15 for line in bottom_lines: line_width, line_height = font.getsize(line) x = (image_width - line_width) / 2 draw.text( (x, y), line, fill="white", font=font, stroke_width=stroke_width, stroke_fill="black", ) y += line_height final_image = os.path.join("memify.webp") img.save(final_image, **img_info) return final_image # https://github.com/TeamUltroid/pyUltroid/blob/31c271cf4d35ab700e5880e952e54c82046812c2/pyUltroid/functions/helper.py#L154 async def bash(cmd): process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() err = stderr.decode().strip() out = stdout.decode().strip() return out, err async def resize_media(media: str, video: bool, fast_forward: bool) -> str: if video: info_ = Media_Info.data(media) width = info_["pixel_sizes"][0] height = info_["pixel_sizes"][1] sec = info_["duration_in_ms"] s = round(float(sec)) / 1000 if height == width: height, width = 512, 512 elif height > width: height, width = 512, -1 elif width > height: height, width = -1, 512 resized_video = f"{media}.webm" if fast_forward: if s > 3: fract_ = 3 / s ff_f = round(fract_, 2) set_pts_ = ff_f - 0.01 if ff_f > fract_ else ff_f cmd_f = f"-filter:v 'setpts={set_pts_}*PTS',scale={width}:{height}" else: cmd_f = f"-filter:v scale={width}:{height}" else: cmd_f = f"-filter:v scale={width}:{height}" fps_ = float(info_["frame_rate"]) fps_cmd = "-r 30 " if fps_ > 30 else "" cmd = f"ffmpeg -i {media} {cmd_f} -ss 00:00:00 -to 00:00:03 -an -c:v libvpx-vp9 {fps_cmd}-fs 256K {resized_video}" _, error, __, ___ = await run_cmd(cmd) os.remove(media) return resized_video image = Image.open(media) maxsize = 512 scale = maxsize / max(image.width, image.height) new_size = (int(image.width * scale), int(image.height * scale)) image = image.resize(new_size, Image.LANCZOS) resized_photo = "sticker.png" image.save(resized_photo) os.remove(media) return resized_photo def get_text(message: Message) -> [None, str]: """Extract Text From Commands""" text_to_return = message.text if message.text is None: return None if " " in text_to_return: try: return message.text.split(None, 1)[1] except IndexError: return None else: return None def get_arg(message: Message): msg = message.text msg = msg.replace(" ", "", 1) if msg[1] == " " else msg split = msg[1:].replace("\n", " \n").split(" ") if " ".join(split[1:]).strip() == "": return "" return " ".join(split[1:]) def get_args(message: Message): try: message = message.text except AttributeError: pass if not message: return False message = message.split(maxsplit=1) if len(message) <= 1: return [] message = message[1] try: split = shlex.split(message) except ValueError: return message return list(filter(lambda x: len(x) > 0, split)) async def run_cmd(cmd: str) -> Tuple[str, str, int, int]: """Run Commands""" args = shlex.split(cmd) process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() return ( stdout.decode("utf-8", "replace").strip(), stderr.decode("utf-8", "replace").strip(), process.returncode, process.pid, ) async def convert_to_image(message, client) -> [None, str]: """Convert Most Media Formats To Raw Image""" if not message: return None if not message.reply_to_message: return None final_path = None if not ( message.reply_to_message.video or message.reply_to_message.photo or message.reply_to_message.sticker or message.reply_to_message.media or message.reply_to_message.animation or message.reply_to_message.audio ): return None if message.reply_to_message.photo: final_path = await message.reply_to_message.download() elif message.reply_to_message.sticker: if message.reply_to_message.sticker.mime_type == "image/webp": final_path = "webp_to_png_s_proton.png" path_s = await message.reply_to_message.download() im = Image.open(path_s) im.save(final_path, "PNG") else: path_s = await client.download_media(message.reply_to_message) final_path = "lottie_proton.png" cmd = ( f"lottie_convert.py --frame 0 -if lottie -of png {path_s} {final_path}" ) await run_cmd(cmd) elif message.reply_to_message.audio: thumb = message.reply_to_message.audio.thumbs[0].file_id final_path = await client.download_media(thumb) elif message.reply_to_message.video or message.reply_to_message.animation: final_path = "fetched_thumb.png" vid_path = await client.download_media(message.reply_to_message) await run_cmd(f"ffmpeg -i {vid_path} -filter:v scale=500:500 -an {final_path}") return final_path async def get_ub_chats( client: Client, chat_types: list = [ enums.ChatType.GROUP, enums.ChatType.SUPERGROUP, enums.ChatType.CHANNEL, ], is_id_only=True, ): ub_chats = [] async for dialog in client.get_dialogs(): if dialog.chat.type in chat_types: if is_id_only: ub_chats.append(dialog.chat.id) else: ub_chats.append(dialog.chat) else: continue return ub_chats def ReplyCheck(message: Message): reply_id = None if message.reply_to_message: reply_id = message.reply_to_message.id elif not message.from_user.is_self: reply_id = message.id return reply_id def SpeedConvert(size): power = 2**10 zero = 0 units = {0: "", 1: "Kbit/s", 2: "Mbit/s", 3: "Gbit/s", 4: "Tbit/s"} while size > power: size /= power zero += 1 return f"{round(size, 2)} {units[zero]}" def GetFromUserID(message: Message): return message.from_user.id def GetChatID(message: Message): return message.chat.id def GetUserMentionable(user: User): if user.username: username = "@{}".format(user.username) else: if user.last_name: name_string = "{} {}".format(user.first_name, user.last_name) else: name_string = "{}".format(user.first_name) username = "{}".format(user.id, name_string) return username def resize_image(image): im = Image.open(image) maxsize = (512, 512) if (im.width and im.height) < 512: size1 = im.width size2 = im.height if im.width > im.height: scale = 512 / size1 size1new = 512 size2new = size2 * scale else: scale = 512 / size2 size1new = size1 * scale size2new = 512 size1new = math.floor(size1new) size2new = math.floor(size2new) sizenew = (size1new, size2new) im = im.resize(sizenew) else: im.thumbnail(maxsize) file_name = "Sticker.png" im.save(file_name, "PNG") if os.path.exists(image): os.remove(image) return file_name class Media_Info: def data(media: str) -> dict: "Get downloaded media's information" found = False media_info = MediaInfo.parse(media) for track in media_info.tracks: if track.track_type == "Video": found = True type_ = track.track_type format_ = track.format duration_1 = track.duration other_duration_ = track.other_duration duration_2 = ( f"{other_duration_[0]} - ({other_duration_[3]})" if other_duration_ else None ) pixel_ratio_ = [track.width, track.height] aspect_ratio_1 = track.display_aspect_ratio other_aspect_ratio_ = track.other_display_aspect_ratio aspect_ratio_2 = other_aspect_ratio_[0] if other_aspect_ratio_ else None fps_ = track.frame_rate fc_ = track.frame_count media_size_1 = track.stream_size other_media_size_ = track.other_stream_size media_size_2 = ( [ other_media_size_[1], other_media_size_[2], other_media_size_[3], other_media_size_[4], ] if other_media_size_ else None ) dict_ = ( { "media_type": type_, "format": format_, "duration_in_ms": duration_1, "duration": duration_2, "pixel_sizes": pixel_ratio_, "aspect_ratio_in_fraction": aspect_ratio_1, "aspect_ratio": aspect_ratio_2, "frame_rate": fps_, "frame_count": fc_, "file_size_in_bytes": media_size_1, "file_size": media_size_2, } if found else None ) return dict_ async def get_files_from_directory(directory: str): all_files = [] for path, _, files in os.walk(directory): for file in files: all_files.append(os.path.join(path, file)) return all_files async def runcmd(cmd: str) -> Tuple[str, str, int, int]: args = shlex.split(cmd) process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() return ( stdout.decode("utf-8", "replace").strip(), stderr.decode("utf-8", "replace").strip(), process.returncode, process.pid, ) async def update_dotenv(key: str, value: str) -> None: with open(".env", "r") as file: data = file.readlines() for index, line in enumerate(data): if line.startswith(f"{key}="): data[index] = f"{key}={value}\n" break with open(".env", "w") as file: file.writelines(data) async def gen_changelogs(repo: Repo, branch: str) -> str: changelogs = "" commits = list(repo.iter_commits(branch))[:5] for index, commit in enumerate(commits): changelogs += f"**{index + 1}.** `{commit.summary}`\n" return changelogs async def initialize_git(git_repo: str): force = False try: repo = Repo() except NoSuchPathError as pathErr: repo.__del__() return False, pathErr, force except GitCommandError as gitErr: repo.__del__() return False, gitErr, force except InvalidGitRepositoryError: repo = Repo.init() origin = repo.create_remote("upstream", f"https://github.com/{git_repo}") origin.fetch() repo.create_head("master", origin.refs.master) repo.heads.master.set_tracking_branch(origin.refs.master) repo.heads.master.checkout(True) force = True with contextlib.suppress(BaseException): repo.create_remote("upstream", f"https://github.com/{git_repo}") return True, repo, force