# Ultroid - UserBot # Copyright (C) 2021-2025 TeamUltroid # # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > # PLease read the GNU Affero General Public License in # . import glob import os import re import time from telethon import Button try: from youtubesearchpython import Playlist, VideosSearch except ImportError: Playlist, VideosSearch = None, None from yt_dlp import YoutubeDL from .. import LOGS, udB from .helper import download_file, humanbytes, run_async, time_formatter from .tools import set_attributes async def ytdl_progress(k, start_time, event): if k["status"] == "error": return await event.edit("error") while k["status"] == "downloading": text = ( f"`Downloading: {k['filename']}\n" + f"Total Size: {humanbytes(k['total_bytes'])}\n" + f"Downloaded: {humanbytes(k['downloaded_bytes'])}\n" + f"Speed: {humanbytes(k['speed'])}/s\n" + f"ETA: {time_formatter(k['eta']*1000)}`" ) if round((time.time() - start_time) % 10.0) == 0: try: await event.edit(text) except Exception as ex: LOGS.error(f"ytdl_progress: {ex}") def get_yt_link(query): search = VideosSearch(query, limit=1).result() try: return search["result"][0]["link"] except IndexError: return async def download_yt(event, link, ytd): reply_to = event.reply_to_msg_id or event info = await dler(event, link, ytd, download=True) if not info: return if info.get("_type", None) == "playlist": total = info["playlist_count"] for num, file in enumerate(info["entries"]): num += 1 id_ = file["id"] thumb = id_ + ".jpg" title = file["title"] await download_file( file.get("thumbnail", None) or file["thumbnails"][-1]["url"], thumb ) ext = "." + ytd["outtmpl"]["default"].split(".")[-1] if ext == ".m4a": ext = ".mp3" id = None for x in glob.glob(f"{id_}*"): if not x.endswith("jpg"): id = x if not id: return ext = "." + id.split(".")[-1] file = title + ext try: os.rename(id, file) except FileNotFoundError: try: os.rename(id + ext, file) except FileNotFoundError as er: if os.path.exists(id): file = id else: raise er if file.endswith(".part"): os.remove(file) os.remove(thumb) await event.client.send_message( event.chat_id, f"`[{num}/{total}]` `Invalid Video format.\nIgnoring that...`", ) return attributes = await set_attributes(file) res, _ = await event.client.fast_uploader( file, show_progress=True, event=event, to_delete=True ) from_ = info["extractor"].split(":")[0] caption = f"`[{num}/{total}]` `{title}`\n\n`from {from_}`" await event.client.send_file( event.chat_id, file=res, caption=caption, attributes=attributes, supports_streaming=True, thumb=thumb, reply_to=reply_to, ) os.remove(thumb) try: await event.delete() except BaseException: pass return title = info["title"] if len(title) > 20: title = title[:17] + "..." id_ = info["id"] thumb = id_ + ".jpg" await download_file( info.get("thumbnail", None) or f"https://i.ytimg.com/vi/{id_}/hqdefault.jpg", thumb, ) ext = "." + ytd["outtmpl"]["default"].split(".")[-1] for _ext in [".m4a", ".mp3", ".opus"]: if ext == _ext: ext = _ext break id = None for x in glob.glob(f"{id_}*"): if not x.endswith("jpg"): id = x if not id: return ext = "." + id.split(".")[-1] file = title + ext try: os.rename(id, file) except FileNotFoundError: os.rename(id + ext, file) attributes = await set_attributes(file) res, _ = await event.client.fast_uploader( file, show_progress=True, event=event, to_delete=True ) caption = f"`{info['title']}`" await event.client.send_file( event.chat_id, file=res, caption=caption, attributes=attributes, supports_streaming=True, thumb=thumb, reply_to=reply_to, ) os.remove(thumb) try: await event.delete() except BaseException: pass # ---------------YouTube Downloader Inline--------------- # @New-Dev0 @buddhhu @1danish-00 def get_formats(type, id, data): if type == "audio": audio = [] for _quality in ["64", "128", "256", "320"]: _audio = {} _audio.update( { "ytid": id, "type": "audio", "id": _quality, "quality": _quality + "KBPS", } ) audio.append(_audio) return audio if type == "video": video = [] size = 0 for vid in data["formats"]: if vid["format_id"] == "251": size += vid["filesize"] if vid.get("filesize") else 0 if vid["vcodec"] != "none": _id = int(vid["format_id"]) _quality = str(vid["width"]) + "×" + str(vid["height"]) _size = size + (vid["filesize"] if vid.get("filesize") else 0) _ext = "mkv" if vid["ext"] == "webm" else "mp4" if _size < 2147483648: # Telegram's Limit of 2GB _video = {} _video.update( { "ytid": id, "type": "video", "id": str(_id) + "+251", "quality": _quality, "size": _size, "ext": _ext, } ) video.append(_video) return video return [] def get_buttons(listt): id = listt[0]["ytid"] butts = [ Button.inline( text=f"[{x['quality']}" + (f" {humanbytes(x['size'])}]" if x.get("size") else "]"), data=f"ytdownload:{x['type']}:{x['id']}:{x['ytid']}" + (f":{x['ext']}" if x.get("ext") else ""), ) for x in listt ] buttons = list(zip(butts[::2], butts[1::2])) if len(butts) % 2 == 1: buttons.append((butts[-1],)) buttons.append([Button.inline("« Back", f"ytdl_back:{id}")]) return buttons async def dler(event, url, opts: dict = {}, download=False): await event.edit("`Getting Data...`") if "quiet" not in opts: opts["quiet"] = True opts["username"] = udB.get_key("YT_USERNAME") opts["password"] = udB.get_key("YT_PASSWORD") if download: await ytdownload(url, opts) try: return await extract_info(url, opts) except Exception as e: await event.edit(f"{type(e)}: {e}") return @run_async def ytdownload(url, opts): try: return YoutubeDL(opts).download([url]) except Exception as ex: LOGS.error(ex) @run_async def extract_info(url, opts): return YoutubeDL(opts).extract_info(url=url, download=False) @run_async def get_videos_link(url): to_return = [] regex = re.search(r"\?list=([(\w+)\-]*)", url) if not regex: return to_return playlist_id = regex.group(1) videos = Playlist(playlist_id) for vid in videos.videos: link = re.search(r"\?v=([(\w+)\-]*)", vid["link"]).group(1) to_return.append(f"https://youtube.com/watch?v={link}") return to_return