# Ultroid - UserBot # Copyright (C) 2021-2025 TeamUltroid # # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > # PLease read the GNU Affero General Public License in # . import json import math import os import random import re, subprocess import secrets import ssl, html from io import BytesIO from json.decoder import JSONDecodeError from traceback import format_exc import requests from .. import * from ..exceptions import DependencyMissingError from . import some_random_headers from .helper import async_searcher, bash, run_async try: import certifi except ImportError: certifi = None try: from PIL import Image, ImageDraw, ImageFont except ImportError: Image, ImageDraw, ImageFont = None, None, None LOGS.info("PIL not installed!") from urllib.parse import quote, unquote from telethon import Button from telethon.tl.types import DocumentAttributeAudio, DocumentAttributeVideo if run_as_module: from ..dB.filestore_db import get_stored_msg, store_msg try: import numpy as np except ImportError: np = None try: from telegraph import Telegraph except ImportError: Telegraph = None try: from bs4 import BeautifulSoup except ImportError: BeautifulSoup = None # ~~~~~~~~~~~~~~~~~~~~OFOX API~~~~~~~~~~~~~~~~~~~~ # @buddhhu async def get_ofox(codename): ofox_baseurl = "https://api.orangefox.download/v3/" releases = await async_searcher( ofox_baseurl + "releases?codename=" + codename, re_json=True ) device = await async_searcher( ofox_baseurl + "devices/get?codename=" + codename, re_json=True ) return device, releases # ~~~~~~~~~~~~~~~JSON Parser~~~~~~~~~~~~~~~ # @buddhhu def _unquote_text(text): return text.replace("'", unquote("%5C%27")).replace('"', unquote("%5C%22")) def json_parser(data, indent=None, ascii=False): parsed = {} try: if isinstance(data, str): parsed = json.loads(str(data)) if indent: parsed = json.dumps( json.loads(str(data)), indent=indent, ensure_ascii=ascii ) elif isinstance(data, dict): parsed = data if indent: parsed = json.dumps(data, indent=indent, ensure_ascii=ascii) except JSONDecodeError: parsed = eval(data) return parsed # ~~~~~~~~~~~~~~~~Link Checker~~~~~~~~~~~~~~~~~ async def is_url_ok(url: str): try: return await async_searcher(url, head=True) except BaseException as er: LOGS.debug(er) return False # ~~~~~~~~~~~~~~~~ Metadata ~~~~~~~~~~~~~~~~~~~~ async def metadata(file): out, _ = await bash(f'mediainfo "{_unquote_text(file)}" --Output=JSON') if _ and _.endswith("NOT_FOUND"): raise DependencyMissingError( f"'{_}' is not installed!\nInstall it to use this command." ) data = {} _info = json.loads(out)["media"] if not _info: return {} _info = _info["track"] info = _info[0] if info.get("Format") in ["GIF", "PNG"]: return { "height": _info[1]["Height"], "width": _info[1]["Width"], "bitrate": _info[1].get("BitRate", 320), } if info.get("AudioCount"): data["title"] = info.get("Title", file) data["performer"] = info.get("Performer") or udB.get_key("artist") or "" if info.get("VideoCount"): data["height"] = int(float(_info[1].get("Height", 720))) data["width"] = int(float(_info[1].get("Width", 1280))) data["bitrate"] = int(_info[1].get("BitRate", 320)) data["duration"] = int(float(info.get("Duration", 0))) return data # ~~~~~~~~~~~~~~~~ Attributes ~~~~~~~~~~~~~~~~ async def set_attributes(file): data = await metadata(file) if not data: return None if "width" in data: return [ DocumentAttributeVideo( duration=data.get("duration", 0), w=data.get("width", 512), h=data.get("height", 512), supports_streaming=True, ) ] ext = "." + file.split(".")[-1] return [ DocumentAttributeAudio( duration=data.get("duration", 0), title=data.get("title", file.split("/")[-1].replace(ext, "")), performer=data.get("performer"), ) ] # ~~~~~~~~~~~~~~~~ Button stuffs ~~~~~~~~~~~~~~~ def get_msg_button(texts: str): btn = [] for z in re.findall("\\[(.*?)\\|(.*?)\\]", texts): text, url = z urls = url.split("|") url = urls[0] if len(urls) > 1: btn[-1].append([text, url]) else: btn.append([[text, url]]) txt = texts for z in re.findall("\\[.+?\\|.+?\\]", texts): txt = txt.replace(z, "") return txt.strip(), btn def create_tl_btn(button: list): btn = [] for z in button: if len(z) > 1: kk = [Button.url(x, y.strip()) for x, y in z] btn.append(kk) else: btn.append([Button.url(z[0][0], z[0][1].strip())]) return btn def format_btn(buttons: list): txt = "" for i in buttons: a = 0 for i in i: if hasattr(i.button, "url"): a += 1 if a > 1: txt += f"[{i.button.text} | {i.button.url} | same]" else: txt += f"[{i.button.text} | {i.button.url}]" _, btn = get_msg_button(txt) return btn # ~~~~~~~~~~~~~~~Saavn Downloader~~~~~~~~~~~~~~~ # @techierror async def saavn_search(query: str): try: data = await async_searcher( url=f"https://saavn-api.vercel.app/search/{query.replace(' ', '%20')}", re_json=True, ) except BaseException: data = None return data # --- webupload ------# # @buddhhu _webupload_cache = {} async def webuploader(chat_id: int, msg_id: int, uploader: str): LOGS.info("webuploader function called with uploader: %s", uploader) if chat_id in _webupload_cache and msg_id in _webupload_cache[chat_id]: file = _webupload_cache[chat_id][msg_id] else: return "File not found in cache." sites = { "siasky": {"url": "https://siasky.net/skynet/skyfile", "json": True}, "file.io": {"url": "https://file.io/", "json": True}, "0x0.st": {"url": "https://0x0.st", "json": False}, "transfer": {"url": "https://transfer.sh", "json": False}, "catbox": {"url": "https://catbox.moe/user/api.php", "json": False}, "filebin": {"url": "https://filebin.net", "json": False}, } if uploader and uploader in sites: url = sites[uploader]["url"] json_format = sites[uploader]["json"] else: return "Uploader not supported or invalid." files = {"file": open(file, "rb")} # Adjusted for both formats try: if uploader == "filebin": cmd = f"curl -X POST --data-binary '@{file}' -H 'filename: \"{file}\"' \"{url}\"" response = subprocess.run(cmd, shell=True, capture_output=True, text=True) if response.returncode == 0: response_json = json.loads(response.stdout) bin_id = response_json.get("bin", {}).get("id") if bin_id: filebin_url = f"https://filebin.net/{bin_id}" return filebin_url else: return "Failed to extract bin ID from Filebin response" else: return f"Failed to upload file to Filebin: {response.stderr.strip()}" elif uploader == "catbox": cmd = f"curl -F reqtype=fileupload -F time=24h -F 'fileToUpload=@{file}' {url}" elif uploader == "0x0.st": cmd = f"curl -F 'file=@{file}' {url}" elif uploader == "file.io" or uploader == "siasky": try: status = await async_searcher( url, data=files, post=True, re_json=json_format ) except Exception as e: return f"Failed to upload file: {e}" if isinstance(status, dict): if "skylink" in status: return f"https://siasky.net/{status['skylink']}" if status.get("status") == 200: return status.get("link") else: raise ValueError("Uploader not supported") response = subprocess.run(cmd, shell=True, capture_output=True, text=True) if response.returncode == 0: return response.stdout.strip() else: return f"Failed to upload file: {response.stderr.strip()}" except Exception as e: return f"Failed to upload file: {e}" del _webupload_cache.get(chat_id, {})[msg_id] return "Failed to get valid URL for the uploaded file." def get_all_files(path, extension=None): filelist = [] for root, dirs, files in os.walk(path): for file in files: if not (extension and not file.endswith(extension)): filelist.append(os.path.join(root, file)) return sorted(filelist) def text_set(text): lines = [] if len(text) <= 55: lines.append(text) else: all_lines = text.split("\n") for line in all_lines: if len(line) <= 55: lines.append(line) else: k = len(line) // 55 for z in range(1, k + 2): lines.append(line[((z - 1) * 55) : (z * 55)]) return lines[:25] # ------------------Logo Gen Helpers---------------- # @TechiError class LogoHelper: @staticmethod def get_text_size(text, image, font): im = Image.new("RGB", (image.width, image.height)) draw = ImageDraw.Draw(im) return draw.textlength(text, font) @staticmethod def find_font_size(text, font, image, target_width_ratio): tested_font_size = 100 tested_font = ImageFont.truetype(font, tested_font_size) observed_width = LogoHelper.get_text_size(text, image, tested_font) estimated_font_size = ( tested_font_size / (observed_width / image.width) * target_width_ratio ) return round(estimated_font_size) @staticmethod def make_logo(imgpath, text, funt, **args): fill = args.get("fill") width_ratio = args.get("width_ratio") or 0.7 stroke_width = int(args.get("stroke_width")) stroke_fill = args.get("stroke_fill") img = Image.open(imgpath) width, height = img.size fct = min(height, width) if height != width: img = img.crop((0, 0, fct, fct)) if img.height < 1000: img = img.resize((1020, 1020)) width, height = img.size draw = ImageDraw.Draw(img) font_size = LogoHelper.find_font_size(text, funt, img, width_ratio) font = ImageFont.truetype(funt, font_size) l, t, r, b = font.getbbox(text) w, h = r - l, (b - t) * 1.5 draw.text( ((width - w) / 2, ((height - h) / 2)), text, font=font, fill=fill, stroke_width=stroke_width, stroke_fill=stroke_fill, ) file_name = check_filename("logo.png") img.save(file_name, "PNG") return file_name # -------------------------------------- # @New-Dev0 async def get_paste(data: str, extension: str = "txt"): try: url = "https://spaceb.in/api/" res = await async_searcher(url, json={"content": data, "extension": extension}, post=True, re_json=True) return True, { "link": f"https://spaceb.in/{res['payload']['id']}", "raw": f"https://spaceb.in/{res['payload']['id']}/raw" } except Exception: try: url = "https://dpaste.org/api/" data = { 'format': 'json', 'content': data.encode('utf-8'), 'lexer': extension, 'expires': '604800', # expire in week } res = await async_searcher(url, data=data, post=True, re_json=True) return True, { "link": res["url"], "raw": f'{res["url"]}/raw' } except Exception as e: LOGS.info(e) return None, { "link": None, "raw": None, "error": str(e) } # https://stackoverflow.com/a/74563494 async def get_google_images(query): """Get image results from Google Custom Search API. Args: query (str): Search query string Returns: list: List of dicts containing image info (title, link, source, thumbnail, original) """ LOGS.info(f"Searching Google Images for: {query}") # Google Custom Search API credentials google_keys = [ { "key": "AIzaSyAj75v6vHWLJdJaYcj44tLz7bdsrh2g7Y0", "cx": "712a54749d99a449e" }, { "key": "AIzaSyDFQQwPLCzcJ9FDao-B7zDusBxk8GoZ0HY", "cx": "001bbd139705f44a6" }, { "key": "AIzaSyD0sRNZUa8-0kq9LAREDAFKLNO1HPmikRU", "cx": "4717c609c54e24250" } ] key_index = random.randint(0, len(google_keys) - 1) GOOGLE_API_KEY = google_keys[key_index]["key"] GOOGLE_CX = google_keys[key_index]["cx"] try: # Construct API URL url = ( "https://www.googleapis.com/customsearch/v1" f"?q={quote(query)}" f"&cx={GOOGLE_CX}" f"&key={GOOGLE_API_KEY}" "&searchType=image" "&num=10" # Number of results ) # Make API request response = await async_searcher(url, re_json=True) print("response") if not response or "items" not in response: LOGS.error("No results from Google Custom Search API") return [] # Process results google_images = [] for item in response["items"]: try: google_images.append({ "title": item.get("title", ""), "link": item.get("contextLink", ""), # Page containing image "source": item.get("displayLink", ""), "thumbnail": item.get("image", {}).get("thumbnailLink", item["link"]), "original": item["link"] # Original image URL }) except Exception as e: LOGS.warning(f"Failed to process image result: {str(e)}") continue # Randomize results order random.shuffle(google_images) LOGS.info(f"Found {len(google_images)} images for query: {query}") return google_images except Exception as e: LOGS.exception(f"Error in get_google_images: {str(e)}") return [] # Thanks https://t.me/ImSafone for ChatBotApi async def get_chatbot_reply(message): chatbot_base = "https://api.safone.dev/chatbot?query={}" req_link = chatbot_base.format( message, ) try: return (await async_searcher(req_link, re_json=True)).get("response") except Exception: LOGS.info(f"**ERROR:**`{format_exc()}`") def check_filename(filroid): if os.path.exists(filroid): no = 1 while True: ult = "{0}_{2}{1}".format(*os.path.splitext(filroid) + (no,)) if os.path.exists(ult): no += 1 else: return ult return filroid # ------ Audio \\ Video tools funcn --------# # https://github.com/1Danish-00/CompressorBot/blob/main/helper/funcn.py#L104 async def genss(file): return (await metadata(file)).get("duration") async def duration_s(file, stime): tsec = await genss(file) x = round(tsec / 5) y = round(tsec / 5 + int(stime)) pin = stdr(x) pon = stdr(y) if y < tsec else stdr(tsec) return pin, pon def stdr(seconds): minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if len(str(minutes)) == 1: minutes = "0" + str(minutes) if len(str(hours)) == 1: hours = "0" + str(hours) if len(str(seconds)) == 1: seconds = "0" + str(seconds) return ( ((str(hours) + ":") if hours else "00:") + ((str(minutes) + ":") if minutes else "00:") + ((str(seconds)) if seconds else "") ) # ------------------- used in pdftools --------------------# # https://www.pyimagesearch.com/2014/08/25/4-point-opencv-getperspective-transform-example def order_points(pts): "get the required points" rect = np.zeros((4, 2), dtype="float32") s = pts.sum(axis=1) rect[0] = pts[np.argmin(s)] rect[2] = pts[np.argmax(s)] diff = np.diff(pts, axis=1) rect[1] = pts[np.argmin(diff)] rect[3] = pts[np.argmax(diff)] return rect def four_point_transform(image, pts): try: import cv2 except ImportError: raise DependencyMissingError("This function needs 'cv2' to be installed.") rect = order_points(pts) (tl, tr, br, bl) = rect widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2)) widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2)) maxWidth = max(int(widthA), int(widthB)) heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2)) heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2)) maxHeight = max(int(heightA), int(heightB)) dst = np.array( [[0, 0], [maxWidth - 1, 0], [maxWidth - 1, maxHeight - 1], [0, maxHeight - 1]], dtype="float32", ) M = cv2.getPerspectiveTransform(rect, dst) return cv2.warpPerspective(image, M, (maxWidth, maxHeight)) # ------------------------------------- Telegraph ---------------------------------- # TELEGRAPH = [] def telegraph_client(): if not Telegraph: LOGS.info("'Telegraph' is not Installed!") return if TELEGRAPH: return TELEGRAPH[0] from .. import udB, ultroid_bot token = udB.get_key("_TELEGRAPH_TOKEN") TELEGRAPH_DOMAIN = udB.get_key("GRAPH_DOMAIN") TelegraphClient = Telegraph(token, domain=TELEGRAPH_DOMAIN or "graph.org") if token: TELEGRAPH.append(TelegraphClient) return TelegraphClient gd_name = ultroid_bot.full_name short_name = gd_name[:30] profile_url = ( f"https://t.me/{ultroid_bot.me.username}" if ultroid_bot.me.username else "https://t.me/TeamUltroid" ) try: TelegraphClient.create_account( short_name=short_name, author_name=gd_name, author_url=profile_url ) except Exception as er: if "SHORT_NAME_TOO_LONG" in str(er): TelegraphClient.create_account( short_name="ultroiduser", author_name=gd_name, author_url=profile_url ) else: LOGS.exception(er) return udB.set_key("_TELEGRAPH_TOKEN", TelegraphClient.get_access_token()) TELEGRAPH.append(TelegraphClient) return TelegraphClient @run_async def make_html_telegraph(title, html=""): telegraph = telegraph_client() page = telegraph.create_page( title=title, html_content=html, ) return page["url"] async def Carbon( code, base_url="https://carbonara.solopov.dev/api/cook", file_name="ultroid", download=False, rayso=False, **kwargs, ): if rayso: base_url = "https://rayso-api-desvhu-33.koyeb.app/generate" kwargs["text"] = code kwargs["theme"] = kwargs.get("theme", "breeze") kwargs["darkMode"] = kwargs.get("darkMode", True) kwargs["title"] = kwargs.get("title", "Ultroid") else: kwargs["code"] = code con = await async_searcher(base_url, post=True, json=kwargs, re_content=True) if not download: file = BytesIO(con) file.name = file_name + ".jpg" else: try: return json_parser(con.decode()) except Exception: pass file = file_name + ".jpg" with open(file, "wb") as f: f.write(con) return file async def get_file_link(msg): from .. import udB msg_id = await msg.forward_to(udB.get_key("LOG_CHANNEL")) await msg_id.reply( "**Message has been stored to generate a shareable link. Do not delete it.**" ) msg_id = msg_id.id msg_hash = secrets.token_hex(nbytes=8) store_msg(msg_hash, msg_id) return msg_hash async def get_stored_file(event, hash): from .. import udB, asst msg_id = get_stored_msg(hash) if not msg_id: return try: msg = await asst.get_messages(udB.get_key("LOG_CHANNEL"), ids=msg_id) except Exception as er: LOGS.warning(f"FileStore, Error: {er}") return if not msg_id: return await asst.send_message( event.chat_id, "__Message was deleted by owner!__", reply_to=event.id ) await asst.send_message(event.chat_id, msg.text, file=msg.media, reply_to=event.id) def translate(text, lang_tgt="en", lang_src="auto", timeout=60, detect=False): pattern = r'(?s)class="(?:t0|result-container)">(.*?)<' escaped_text = quote(text.encode("utf8")) url = "https://translate.google.com/m?tl=%s&sl=%s&q=%s" % ( lang_tgt, lang_src, escaped_text, ) response = requests.get(url, timeout=timeout).content result = response.decode("utf8") result = re.findall(pattern, result) if not result: return "" text = html.unescape(result[0]) return (text, None) if detect else text def cmd_regex_replace(cmd): return ( cmd.replace("$", "") .replace("?(.*)", "") .replace("(.*)", "") .replace("(?: |)", "") .replace("| ", "") .replace("( |)", "") .replace("?((.|//)*)", "") .replace("?P\\w+", "") .replace("(", "") .replace(")", "") .replace("?(\\d+)", "") ) # ------------------------# class LottieException(Exception): ... class TgConverter: """Convert files related to Telegram""" @staticmethod async def animated_sticker(file, out_path="sticker.tgs", throw=False, remove=False): """Convert to/from animated sticker.""" LOGS.info(f"Converting animated sticker: {file} -> {out_path}") try: if out_path.endswith("webp"): er, out = await bash( f"lottie_convert.py --webp-quality 100 --webp-skip-frames 100 '{file}' '{out_path}'" ) else: er, out = await bash(f"lottie_convert.py '{file}' '{out_path}'") if er: LOGS.error(f"Error in animated_sticker conversion: {er}") if throw: raise LottieException(er) if remove and os.path.exists(file): os.remove(file) LOGS.info(f"Removed original file: {file}") if os.path.exists(out_path): LOGS.info(f"Successfully converted to {out_path}") return out_path LOGS.error(f"Output file not created: {out_path}") return None except Exception as e: LOGS.exception(f"Unexpected error in animated_sticker: {str(e)}") if throw: raise @staticmethod async def animated_to_gif(file, out_path="gif.gif"): """Convert animated sticker to gif.""" LOGS.info(f"Converting to gif: {file} -> {out_path}") try: er, out = await bash( f"lottie_convert.py '{_unquote_text(file)}' '{_unquote_text(out_path)}'" ) if er: LOGS.error(f"Error in animated_to_gif conversion: {er}") if os.path.exists(out_path): LOGS.info("Successfully converted to gif") return out_path LOGS.error("Gif conversion failed - output file not created") return None except Exception as e: LOGS.exception(f"Unexpected error in animated_to_gif: {str(e)}") return None @staticmethod def resize_photo_sticker(photo): """Resize the given photo to 512x512 (for creating telegram sticker).""" LOGS.info(f"Resizing photo for sticker: {photo}") try: image = Image.open(photo) original_size = (image.width, image.height) if (image.width and image.height) < 512: size1 = image.width size2 = image.height if image.width > image.height: scale = 512 / size1 size1new = 512 size2new = size2 * scale else: scale = 512 / size2 size1new = size1 * scale size2new = 512 size1new = math.floor(size1new) size2new = math.floor(size2new) sizenew = (size1new, size2new) image = image.resize(sizenew) else: maxsize = (512, 512) image.thumbnail(maxsize) LOGS.info(f"Resized image from {original_size} to {image.size}") return image except Exception as e: LOGS.exception(f"Error in resize_photo_sticker: {str(e)}") raise @staticmethod async def ffmpeg_convert(input_, output, remove=False): if output.endswith(".webm"): return await TgConverter.create_webm( input_, name=output[:-5], remove=remove ) if output.endswith(".gif"): out, er = await bash(f"ffmpeg -i '{input_}' -an -sn -c:v copy '{output}.mp4' -y") LOGS.info(f"FFmpeg output: {out}, Error: {er}") else: out, er = await bash(f"ffmpeg -i '{input_}' '{output}' -y") LOGS.info(f"FFmpeg output: {out}, Error: {er}") if remove: os.remove(input_) if os.path.exists(output): return output @staticmethod async def create_webm(file, name="video", remove=False): LOGS.info(f"Creating webm: {file} -> {name}.webm") try: _ = await metadata(file) name += ".webm" h, w = _["height"], _["width"] if h == w and h != 512: h, w = 512, 512 if h != 512 or w != 512: if h > w: h, w = 512, -1 if w > h: h, w = -1, 512 await bash( f'ffmpeg -i "{file}" -preset fast -an -to 00:00:03 -crf 30 -bufsize 256k -b:v {_["bitrate"]} -vf "scale={w}:{h},fps=30" -c:v libvpx-vp9 "{name}" -y' ) if remove and os.path.exists(file): os.remove(file) LOGS.info(f"Removed original file: {file}") if os.path.exists(name): LOGS.info(f"Successfully created webm: {name}") return name LOGS.error(f"Webm creation failed - output file not created: {name}") return None except Exception as e: LOGS.exception(f"Error in create_webm: {str(e)}") return None @staticmethod def to_image(input_, name, remove=False): """Convert video/gif to image using first frame.""" LOGS.info(f"Converting to image: {input_} -> {name}") try: if not input_: LOGS.error("Input file is None") return None if not os.path.exists(input_): LOGS.error(f"Input file does not exist: {input_}") return None try: import cv2 except ImportError: raise DependencyMissingError("This function needs 'cv2' to be installed.") img = cv2.VideoCapture(input_) success, frame = img.read() if not success: LOGS.error(f"Failed to read frame from {input_}") return None cv2.imwrite(name, frame) img.release() if not os.path.exists(name): LOGS.error(f"Failed to save image: {name}") return None if remove and os.path.exists(input_): os.remove(input_) LOGS.info(f"Removed original file: {input_}") LOGS.info(f"Successfully converted to image: {name}") return name except Exception as e: LOGS.exception(f"Error in to_image conversion: {str(e)}") return None @staticmethod async def convert( input_file, outname="converted", convert_to=None, allowed_formats=[], remove_old=True, ): """Convert between different file formats.""" LOGS.info(f"Converting {input_file} to {convert_to or allowed_formats}") if not input_file: LOGS.error("Input file is None") return None if not os.path.exists(input_file): LOGS.error(f"Input file does not exist: {input_file}") return None if "." in input_file: ext = input_file.split(".")[-1].lower() else: LOGS.error("Input file has no extension") return input_file if ( ext in allowed_formats or ext == convert_to or not (convert_to or allowed_formats) ): return input_file def recycle_type(exte): return convert_to == exte or exte in allowed_formats try: # Sticker to Something if ext == "tgs": for extn in ["webp", "json", "png", "mp4", "gif"]: if recycle_type(extn): name = outname + "." + extn result = await TgConverter.animated_sticker( input_file, name, remove=remove_old ) if result: return result if recycle_type("webm"): gif_file = await TgConverter.convert( input_file, convert_to="gif", remove_old=remove_old ) if gif_file: return await TgConverter.create_webm(gif_file, outname, remove=True) # Json -> Tgs elif ext == "json": if recycle_type("tgs"): name = outname + ".tgs" return await TgConverter.animated_sticker( input_file, name, remove=remove_old ) # Video to Something elif ext in ["webm", "mp4", "gif"]: for exte in ["webm", "mp4", "gif"]: if recycle_type(exte): name = outname + "." + exte result = await TgConverter.ffmpeg_convert( input_file, name, remove=remove_old ) if result: return result for exte in ["png", "jpg", "jpeg", "webp"]: if recycle_type(exte): name = outname + "." + exte result = TgConverter.to_image(input_file, name, remove=remove_old) if result: return result # Image to Something elif ext in ["jpg", "jpeg", "png", "webp"]: for extn in ["png", "webp", "ico"]: if recycle_type(extn): try: img = Image.open(input_file) name = outname + "." + extn img.save(name, extn.upper()) if remove_old and os.path.exists(input_file): os.remove(input_file) LOGS.info(f"Removed original file: {input_file}") return name except Exception as e: LOGS.error(f"Failed to convert image to {extn}: {str(e)}") continue for extn in ["webm", "gif", "mp4"]: if recycle_type(extn): name = outname + "." + extn if extn == "webm": png_file = await TgConverter.convert( input_file, convert_to="png", remove_old=remove_old, ) if png_file: return await TgConverter.ffmpeg_convert( png_file, name, remove=True ) else: return await TgConverter.ffmpeg_convert( input_file, name, remove=remove_old ) LOGS.error(f"No valid conversion found for {input_file} to {convert_to or allowed_formats}") return None except Exception as e: LOGS.exception(f"Error in convert: {str(e)}") return None def _get_value(stri): try: value = eval(stri.strip()) except Exception as er: from .. import LOGS LOGS.debug(er) value = stri.strip() return value def safe_load(file, *args, **kwargs): if isinstance(file, str): read = file.split("\n") else: read = file.readlines() out = {} for line in read: if ":" in line: # Ignores Empty & Invalid lines spli = line.split(":", maxsplit=1) key = spli[0].strip() value = _get_value(spli[1]) out.update({key: value or []}) elif "-" in line: spli = line.split("-", maxsplit=1) where = out[list(out.keys())[-1]] if isinstance(where, list): value = _get_value(spli[1]) if value: where.append(value) return out def get_chat_and_msgid(link): matches = re.findall("https:\\/\\/t\\.me\\/(c\\/|)(.*)\\/(.*)", link) if not matches: return None, None _, chat, msg_id = matches[0] if chat.isdigit(): chat = int("-100" + chat) return chat, int(msg_id) # --------- END --------- #