|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import glob |
|
import os |
|
import re |
|
import time |
|
|
|
from telethon import Button |
|
|
|
try: |
|
from youtubesearchpython import Playlist, VideosSearch |
|
except ImportError: |
|
Playlist, VideosSearch = None, None |
|
|
|
from yt_dlp import YoutubeDL |
|
|
|
from .. import LOGS, udB |
|
from .helper import download_file, humanbytes, run_async, time_formatter |
|
from .tools import set_attributes |
|
|
|
|
|
async def ytdl_progress(k, start_time, event): |
|
if k["status"] == "error": |
|
return await event.edit("error") |
|
while k["status"] == "downloading": |
|
text = ( |
|
f"`Downloading: {k['filename']}\n" |
|
+ f"Total Size: {humanbytes(k['total_bytes'])}\n" |
|
+ f"Downloaded: {humanbytes(k['downloaded_bytes'])}\n" |
|
+ f"Speed: {humanbytes(k['speed'])}/s\n" |
|
+ f"ETA: {time_formatter(k['eta']*1000)}`" |
|
) |
|
if round((time.time() - start_time) % 10.0) == 0: |
|
try: |
|
await event.edit(text) |
|
except Exception as ex: |
|
LOGS.error(f"ytdl_progress: {ex}") |
|
|
|
|
|
def get_yt_link(query): |
|
search = VideosSearch(query, limit=1).result() |
|
try: |
|
return search["result"][0]["link"] |
|
except IndexError: |
|
return |
|
|
|
|
|
async def download_yt(event, link, ytd): |
|
reply_to = event.reply_to_msg_id or event |
|
info = await dler(event, link, ytd, download=True) |
|
if not info: |
|
return |
|
if info.get("_type", None) == "playlist": |
|
total = info["playlist_count"] |
|
for num, file in enumerate(info["entries"]): |
|
num += 1 |
|
id_ = file["id"] |
|
thumb = id_ + ".jpg" |
|
title = file["title"] |
|
await download_file( |
|
file.get("thumbnail", None) or file["thumbnails"][-1]["url"], thumb |
|
) |
|
ext = "." + ytd["outtmpl"]["default"].split(".")[-1] |
|
if ext == ".m4a": |
|
ext = ".mp3" |
|
id = None |
|
for x in glob.glob(f"{id_}*"): |
|
if not x.endswith("jpg"): |
|
id = x |
|
if not id: |
|
return |
|
ext = "." + id.split(".")[-1] |
|
file = title + ext |
|
try: |
|
os.rename(id, file) |
|
except FileNotFoundError: |
|
try: |
|
os.rename(id + ext, file) |
|
except FileNotFoundError as er: |
|
if os.path.exists(id): |
|
file = id |
|
else: |
|
raise er |
|
if file.endswith(".part"): |
|
os.remove(file) |
|
os.remove(thumb) |
|
await event.client.send_message( |
|
event.chat_id, |
|
f"`[{num}/{total}]` `Invalid Video format.\nIgnoring that...`", |
|
) |
|
return |
|
attributes = await set_attributes(file) |
|
res, _ = await event.client.fast_uploader( |
|
file, show_progress=True, event=event, to_delete=True |
|
) |
|
from_ = info["extractor"].split(":")[0] |
|
caption = f"`[{num}/{total}]` `{title}`\n\n`from {from_}`" |
|
await event.client.send_file( |
|
event.chat_id, |
|
file=res, |
|
caption=caption, |
|
attributes=attributes, |
|
supports_streaming=True, |
|
thumb=thumb, |
|
reply_to=reply_to, |
|
) |
|
os.remove(thumb) |
|
try: |
|
await event.delete() |
|
except BaseException: |
|
pass |
|
return |
|
title = info["title"] |
|
if len(title) > 20: |
|
title = title[:17] + "..." |
|
id_ = info["id"] |
|
thumb = id_ + ".jpg" |
|
await download_file( |
|
info.get("thumbnail", None) or f"https://i.ytimg.com/vi/{id_}/hqdefault.jpg", |
|
thumb, |
|
) |
|
ext = "." + ytd["outtmpl"]["default"].split(".")[-1] |
|
for _ext in [".m4a", ".mp3", ".opus"]: |
|
if ext == _ext: |
|
ext = _ext |
|
break |
|
id = None |
|
for x in glob.glob(f"{id_}*"): |
|
if not x.endswith("jpg"): |
|
id = x |
|
if not id: |
|
return |
|
ext = "." + id.split(".")[-1] |
|
file = title + ext |
|
try: |
|
os.rename(id, file) |
|
except FileNotFoundError: |
|
os.rename(id + ext, file) |
|
attributes = await set_attributes(file) |
|
res, _ = await event.client.fast_uploader( |
|
file, show_progress=True, event=event, to_delete=True |
|
) |
|
caption = f"`{info['title']}`" |
|
await event.client.send_file( |
|
event.chat_id, |
|
file=res, |
|
caption=caption, |
|
attributes=attributes, |
|
supports_streaming=True, |
|
thumb=thumb, |
|
reply_to=reply_to, |
|
) |
|
os.remove(thumb) |
|
try: |
|
await event.delete() |
|
except BaseException: |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_formats(type, id, data): |
|
if type == "audio": |
|
audio = [] |
|
for _quality in ["64", "128", "256", "320"]: |
|
_audio = {} |
|
_audio.update( |
|
{ |
|
"ytid": id, |
|
"type": "audio", |
|
"id": _quality, |
|
"quality": _quality + "KBPS", |
|
} |
|
) |
|
audio.append(_audio) |
|
return audio |
|
if type == "video": |
|
video = [] |
|
size = 0 |
|
for vid in data["formats"]: |
|
if vid["format_id"] == "251": |
|
size += vid["filesize"] if vid.get("filesize") else 0 |
|
if vid["vcodec"] != "none": |
|
_id = int(vid["format_id"]) |
|
_quality = str(vid["width"]) + "×" + str(vid["height"]) |
|
_size = size + (vid["filesize"] if vid.get("filesize") else 0) |
|
_ext = "mkv" if vid["ext"] == "webm" else "mp4" |
|
if _size < 2147483648: |
|
_video = {} |
|
_video.update( |
|
{ |
|
"ytid": id, |
|
"type": "video", |
|
"id": str(_id) + "+251", |
|
"quality": _quality, |
|
"size": _size, |
|
"ext": _ext, |
|
} |
|
) |
|
video.append(_video) |
|
return video |
|
return [] |
|
|
|
|
|
def get_buttons(listt): |
|
id = listt[0]["ytid"] |
|
butts = [ |
|
Button.inline( |
|
text=f"[{x['quality']}" |
|
+ (f" {humanbytes(x['size'])}]" if x.get("size") else "]"), |
|
data=f"ytdownload:{x['type']}:{x['id']}:{x['ytid']}" |
|
+ (f":{x['ext']}" if x.get("ext") else ""), |
|
) |
|
for x in listt |
|
] |
|
buttons = list(zip(butts[::2], butts[1::2])) |
|
if len(butts) % 2 == 1: |
|
buttons.append((butts[-1],)) |
|
buttons.append([Button.inline("« Back", f"ytdl_back:{id}")]) |
|
return buttons |
|
|
|
|
|
async def dler(event, url, opts: dict = {}, download=False): |
|
await event.edit("`Getting Data...`") |
|
if "quiet" not in opts: |
|
opts["quiet"] = True |
|
opts["username"] = udB.get_key("YT_USERNAME") |
|
opts["password"] = udB.get_key("YT_PASSWORD") |
|
if download: |
|
await ytdownload(url, opts) |
|
try: |
|
return await extract_info(url, opts) |
|
except Exception as e: |
|
await event.edit(f"{type(e)}: {e}") |
|
return |
|
|
|
|
|
@run_async |
|
def ytdownload(url, opts): |
|
try: |
|
return YoutubeDL(opts).download([url]) |
|
except Exception as ex: |
|
LOGS.error(ex) |
|
|
|
|
|
@run_async |
|
def extract_info(url, opts): |
|
return YoutubeDL(opts).extract_info(url=url, download=False) |
|
|
|
|
|
@run_async |
|
def get_videos_link(url): |
|
to_return = [] |
|
regex = re.search(r"\?list=([(\w+)\-]*)", url) |
|
if not regex: |
|
return to_return |
|
playlist_id = regex.group(1) |
|
videos = Playlist(playlist_id) |
|
for vid in videos.videos: |
|
link = re.search(r"\?v=([(\w+)\-]*)", vid["link"]).group(1) |
|
to_return.append(f"https://youtube.com/watch?v={link}") |
|
return to_return |
|
|