akn-dev / akn /Youtube /youtube.py
randydev's picture
fix revert back and update
21bc372
import os
import time
import requests
from pyrogram.types import *
from pyrogram import Client, filters
from youtube_search import YoutubeSearch
from yt_dlp import YoutubeDL
from akn.utils.database import db
from akn.utils.driver import YoutubeDriver
from akn.utils.formatter import secs_to_mins
from akn.utils.logger import LOGS
from akn.utils.scripts import progress
custom_loading = "<emoji id=5974235702701853774>πŸ—Ώ</emoji>"
COMMAND_LIST = """
/yta: Download the youtube link video in .mp3 format!.
/ytv: Download the youtube link video in .mp4 format!.
/ytsa: Download the youtube search video in .mp3 format!.
/ytva: Download the youtube search video in .mp4 format!
/ytlink: Search for a video on youtube
"""
async def input_user(message: Message) -> str:
"""Get the input from the user"""
if len(message.command) < 2:
output = ""
else:
try:
output = message.text.split(" ", 1)[1].strip() or ""
except IndexError:
output = ""
return output
@Client.on_message(
filters.command(["start"])
& ~filters.forwarded
)
async def startbot(client: Client, message: Message):
buttons = [
[
InlineKeyboardButton(
text="Developer",
url=f"https://t.me/xpushz"
),
InlineKeyboardButton(
text="Channel",
url='https://t.me/RendyProjects'
),
]
]
await message.reply_text(
text=COMMAND_LIST,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(buttons)
)
@Client.on_message(
filters.command(["ytsa"])
& ~filters.forwarded
)
async def youtube_search_audio(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube search to download audio."
)
query = await input_user(message)
results = YoutubeSearch(query, max_results=5).to_dict()
watch = results[0]["url_suffix"]
url_suffix = watch.split("/")[1]
okk = f"https://youtube.com/{url_suffix}"
pro = await message.reply_text("Checking ...")
_, url = YoutubeDriver.check_url(okk)
#if not status:
# return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.song_options()) as ytdl:
yt_data = ytdl.extract_info(url, False)
yt_file = ytdl.prepare_filename(yt_data)
ytdl.process_info(yt_data)
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_audio(
f"{yt_file}.mp3",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
performer="[Akeno UB]",
title=yt_data["title"],
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
return await pro.edit_text(f"**πŸ€ Audio not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp3")
except:
pass
@Client.on_message(
filters.command(["yta"])
& ~filters.forwarded
)
async def youtube_audio(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube link to download audio."
)
query = await input_user(message)
pro = await message.reply_text("Checking ...")
_, url = YoutubeDriver.check_url(query)
#if not status:
# return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.song_options()) as ytdl:
yt_data = ytdl.extract_info(url, False)
yt_file = ytdl.prepare_filename(yt_data)
ytdl.process_info(yt_data)
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_audio(
f"{yt_file}.mp3",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
performer="[Akeno UB]",
title=yt_data["title"],
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
return await pro.edit_text(f"**πŸ€ Audio not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp3")
except:
pass
@Client.on_message(
filters.command(["ytva"])
& ~filters.forwarded
)
async def ytvideo_search(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube search to download video."
)
query = await input_user(message)
results = YoutubeSearch(query, max_results=5).to_dict()
watch = results[0]["url_suffix"]
url_suffix = watch.split("/")[1]
okk = f"https://youtube.com/{url_suffix}"
pro = await message.reply_text("Checking ...")
_, url = YoutubeDriver.check_url(okk)
#if not status:
# return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.video_options()) as ytdl:
yt_data = ytdl.extract_info(url, True)
yt_file = yt_data["id"]
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_video(
f"{yt_file}.mp4",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
return await pro.edit_text(f"**πŸ€ Video not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp4")
except:
pass
@Client.on_message(
filters.command(["ytv"])
& ~filters.forwarded
)
async def ytvideo(client: Client, message: Message):
if len(message.command) < 2:
return await message.reply_text(
"Give a valid youtube link to download video."
)
query = await input_user(message)
pro = await message.reply_text("Checking ...")
_, url = YoutubeDriver.check_url(query)
#if not status:
# return await pro.edit_text(url)
if client.me.is_premium:
await pro.edit_text(f"{custom_loading}__Downloading audio ...__")
else:
await pro.edit_text(f"__Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.video_options()) as ytdl:
yt_data = ytdl.extract_info(url, True)
yt_file = yt_data["id"]
if client.me.is_premium:
upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
else:
upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`"
await pro.edit_text(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_video(
f"{yt_file}.mp4",
caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
pro,
time.time(),
upload_text,
),
)
await pro.delete()
except Exception as e:
return await pro.edit_text(f"**πŸ€ Video not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp4")
except:
pass
@Client.on_message(
filters.command(["ytlink"])
& ~filters.forwarded
)
async def ytlink(_, message: Message):
if len(message.command) < 2:
return await message.reply_text("Give something to search on youtube.")
query = await input_user(message)
pro = await message.reply_text("Searching ...")
try:
results = YoutubeDriver(query, 7).to_dict()
except Exception as e:
return await pro.edit_text(f"**πŸ€ Error:** `{e}`")
if not results:
return await pro.edit_text("No results found.")
text = f"**πŸ”Ž π–³π—ˆπ—π–Ίπ—… π–±π–Ύπ—Œπ—Žπ—…π—π—Œ π–₯π—ˆπ—Žπ—‡π–½:** `{len(results)}`\n\n"
for result in results:
text += f"**𝖳𝗂𝗍𝗅𝖾:** `{result['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{result['channel']}`\n**π–΅π—‚π–Ύπ—π—Œ:** `{result['views']}`\n**π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{result['duration']}`\n**𝖫𝗂𝗇𝗄:** `https://youtube.com{result['url_suffix']}`\n\n"
await pro.edit_text(text, disable_web_page_preview=True)