akn-dev / akn /Akeno /youtube.py
randydev's picture
fix revert back and update
21bc372
import os
import time
import requests
from pyrogram.types import Message
from pyrogram import Client, filters
from youtube_search import YoutubeSearch
from yt_dlp import YoutubeDL
from akn import log_detailed_error
from akn.utils.database import db
from akn.utils.driver import YoutubeDriver
from akn.utils.formatter import secs_to_mins
from akn.utils.handler import *
from akn.utils.logger import LOGS
from akn.utils.scripts import progress
from akn.utils.prefixprem import command
from config import *
custom_loading = "<emoji id=5974235702701853774>πŸ—Ώ</emoji>"
@Akeno(
~filters.scheduled
& command(["ytsa"])
& filters.me
& ~filters.forwarded
)
async def youtube_search_audio(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube search to download audio."
)
query = await input_user(message)
results = YoutubeSearch(query, max_results=5).to_dict()
watch = results[0]["url_suffix"]
url_suffix = watch.split("/")[1]
okk = f"https://youtube.com/{url_suffix}"
pro = await message.reply_text("Checking ...")
status, url = YoutubeDriver.check_url(okk)
if not status:
return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.song_options()) as ytdl:
yt_data = ytdl.extract_info(url, False)
yt_file = ytdl.prepare_filename(yt_data)
ytdl.process_info(yt_data)
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_audio(
f"{yt_file}.mp3",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
performer="[Akeno UB]",
title=yt_data["title"],
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
await log_detailed_error(e, where=client.me.id, who=message.chat.title)
return await pro.edit_text(f"**πŸ€ Audio not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp3")
except:
pass
@Akeno(
~filters.scheduled
& command(["yta"])
& filters.me
& ~filters.forwarded
)
async def youtube_audio(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube link to download audio."
)
query = await input_user(message)
pro = await message.reply_text("Checking ...")
status, url = YoutubeDriver.check_url(query)
if not status:
return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.song_options()) as ytdl:
yt_data = ytdl.extract_info(url, False)
yt_file = ytdl.prepare_filename(yt_data)
ytdl.process_info(yt_data)
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_audio(
f"{yt_file}.mp3",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
performer="[Akeno UB]",
title=yt_data["title"],
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
await log_detailed_error(e, where=client.me.id, who=message.chat.title)
return await pro.edit_text(f"**πŸ€ Audio not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp3")
except:
pass
@Akeno(
~filters.scheduled
& command(["ytva"])
& filters.me
& ~filters.forwarded
)
async def ytvideo_search(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube search to download video."
)
query = await input_user(message)
results = YoutubeSearch(query, max_results=5).to_dict()
watch = results[0]["url_suffix"]
url_suffix = watch.split("/")[1]
okk = f"https://youtube.com/{url_suffix}"
pro = await message.reply_text("Checking ...")
status, url = YoutubeDriver.check_url(okk)
if not status:
return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.video_options()) as ytdl:
yt_data = ytdl.extract_info(url, True)
yt_file = yt_data["id"]
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_video(
f"{yt_file}.mp4",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
await log_detailed_error(e, where=client.me.id, who=message.chat.title)
return await pro.edit_text(f"**πŸ€ Video not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp4")
except:
pass
@Akeno(
~filters.scheduled
& command(["ytv"])
& filters.me
& ~filters.forwarded
)
async def ytvideo(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube link to download video."
)
query = await input_user(message)
pro = await message.reply_text("Checking ...")
status, url = YoutubeDriver.check_url(query)
if not status:
return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.video_options()) as ytdl:
yt_data = ytdl.extract_info(url, True)
yt_file = yt_data["id"]
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_video(
f"{yt_file}.mp4",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
await log_detailed_error(e, where=client.me.id, who=message.chat.title)
return await pro.edit_text(f"**πŸ€ Video not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp4")
except:
pass
@Akeno(
~filters.scheduled
& command(["ytlink"])
& filters.me
& ~filters.forwarded
)
async def ytlink(_, message: Message):
if len(message.command) < 2:
return await message.reply_text("Give something to search on youtube.")
query = await input_user(message)
pro = await message.reply_text("Searching ...")
try:
results = YoutubeDriver(query, 7).to_dict()
except Exception as e:
await log_detailed_error(e, where=client.me.id, who=message.chat.title)
return await pro.edit_text(f"**πŸ€ Error:** `{e}`")
if not results:
return await pro.edit_text("No results found.")
text = f"**πŸ”Ž π–³π—ˆπ—π–Ίπ—… π–±π–Ύπ—Œπ—Žπ—…π—π—Œ π–₯π—ˆπ—Žπ—‡π–½:** `{len(results)}`\n\n"
for result in results:
text += f"**𝖳𝗂𝗍𝗅𝖾:** `{result['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{result['channel']}`\n**π–΅π—‚π–Ύπ—π—Œ:** `{result['views']}`\n**π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{result['duration']}`\n**𝖫𝗂𝗇𝗄:** `https://youtube.com{result['url_suffix']}`\n\n"
await pro.edit_text(text, disable_web_page_preview=True)
module = modules_help.add_module("youtube", __file__)
module.add_command("yta", "Download the youtube link video in .mp3 format!.")
module.add_command("ytv", "Download the youtube link video in .mp3 format!.")
module.add_command("ytsa", "Download the youtube search video in .mp3 format!.")
module.add_command("ytva", "Download the youtube search video in .mp4 format!")
module.add_command("ytlink", "Search for a video on youtube")