|
import os |
|
import time |
|
|
|
import requests |
|
from pyrogram.types import * |
|
from pyrogram import Client, filters |
|
from youtube_search import YoutubeSearch |
|
from yt_dlp import YoutubeDL |
|
|
|
from akn.utils.database import db |
|
from akn.utils.driver import YoutubeDriver |
|
from akn.utils.formatter import secs_to_mins |
|
from akn.utils.logger import LOGS |
|
from akn.utils.scripts import progress |
|
|
|
custom_loading = "<emoji id=5974235702701853774>πΏ</emoji>" |
|
|
|
COMMAND_LIST = """ |
|
/yta: Download the youtube link video in .mp3 format!. |
|
/ytv: Download the youtube link video in .mp4 format!. |
|
/ytsa: Download the youtube search video in .mp3 format!. |
|
/ytva: Download the youtube search video in .mp4 format! |
|
/ytlink: Search for a video on youtube |
|
""" |
|
|
|
async def input_user(message: Message) -> str: |
|
"""Get the input from the user""" |
|
if len(message.command) < 2: |
|
output = "" |
|
else: |
|
try: |
|
output = message.text.split(" ", 1)[1].strip() or "" |
|
except IndexError: |
|
output = "" |
|
return output |
|
|
|
@Client.on_message( |
|
filters.command(["start"]) |
|
& ~filters.forwarded |
|
) |
|
async def startbot(client: Client, message: Message): |
|
buttons = [ |
|
[ |
|
InlineKeyboardButton( |
|
text="Developer", |
|
url=f"https://t.me/xpushz" |
|
), |
|
InlineKeyboardButton( |
|
text="Channel", |
|
url='https://t.me/RendyProjects' |
|
), |
|
] |
|
] |
|
await message.reply_text( |
|
text=COMMAND_LIST, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup(buttons) |
|
) |
|
|
|
@Client.on_message( |
|
filters.command(["ytsa"]) |
|
& ~filters.forwarded |
|
) |
|
async def youtube_search_audio(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube search to download audio." |
|
) |
|
query = await input_user(message) |
|
results = YoutubeSearch(query, max_results=5).to_dict() |
|
watch = results[0]["url_suffix"] |
|
url_suffix = watch.split("/")[1] |
|
okk = f"https://youtube.com/{url_suffix}" |
|
pro = await message.reply_text("Checking ...") |
|
_, url = YoutubeDriver.check_url(okk) |
|
|
|
|
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.song_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, False) |
|
yt_file = ytdl.prepare_filename(yt_data) |
|
ytdl.process_info(yt_data) |
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_audio( |
|
f"{yt_file}.mp3", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
performer="[Akeno UB]", |
|
title=yt_data["title"], |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
return await pro.edit_text(f"**π Audio not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp3") |
|
except: |
|
pass |
|
|
|
@Client.on_message( |
|
filters.command(["yta"]) |
|
& ~filters.forwarded |
|
) |
|
async def youtube_audio(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube link to download audio." |
|
) |
|
query = await input_user(message) |
|
pro = await message.reply_text("Checking ...") |
|
_, url = YoutubeDriver.check_url(query) |
|
|
|
|
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.song_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, False) |
|
yt_file = ytdl.prepare_filename(yt_data) |
|
ytdl.process_info(yt_data) |
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_audio( |
|
f"{yt_file}.mp3", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
performer="[Akeno UB]", |
|
title=yt_data["title"], |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
return await pro.edit_text(f"**π Audio not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp3") |
|
except: |
|
pass |
|
|
|
@Client.on_message( |
|
filters.command(["ytva"]) |
|
& ~filters.forwarded |
|
) |
|
async def ytvideo_search(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube search to download video." |
|
) |
|
query = await input_user(message) |
|
results = YoutubeSearch(query, max_results=5).to_dict() |
|
watch = results[0]["url_suffix"] |
|
url_suffix = watch.split("/")[1] |
|
okk = f"https://youtube.com/{url_suffix}" |
|
pro = await message.reply_text("Checking ...") |
|
_, url = YoutubeDriver.check_url(okk) |
|
|
|
|
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.video_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, True) |
|
yt_file = yt_data["id"] |
|
|
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_video( |
|
f"{yt_file}.mp4", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
return await pro.edit_text(f"**π Video not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp4") |
|
except: |
|
pass |
|
|
|
@Client.on_message( |
|
filters.command(["ytv"]) |
|
& ~filters.forwarded |
|
) |
|
async def ytvideo(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube link to download video." |
|
) |
|
query = await input_user(message) |
|
pro = await message.reply_text("Checking ...") |
|
_, url = YoutubeDriver.check_url(query) |
|
|
|
|
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.video_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, True) |
|
yt_file = yt_data["id"] |
|
|
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_video( |
|
f"{yt_file}.mp4", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
return await pro.edit_text(f"**π Video not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp4") |
|
except: |
|
pass |
|
|
|
@Client.on_message( |
|
filters.command(["ytlink"]) |
|
& ~filters.forwarded |
|
) |
|
async def ytlink(_, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text("Give something to search on youtube.") |
|
query = await input_user(message) |
|
pro = await message.reply_text("Searching ...") |
|
try: |
|
results = YoutubeDriver(query, 7).to_dict() |
|
except Exception as e: |
|
return await pro.edit_text(f"**π Error:** `{e}`") |
|
if not results: |
|
return await pro.edit_text("No results found.") |
|
text = f"**π π³πππΊπ
π±πΎπππ
ππ π₯ππππ½:** `{len(results)}`\n\n" |
|
for result in results: |
|
text += f"**π³πππ
πΎ:** `{result['title'][:50]}`\n**π’ππΊπππΎπ
:** `{result['channel']}`\n**π΅ππΎππ:** `{result['views']}`\n**π£πππΊππππ:** `{result['duration']}`\n**π«πππ:** `https://youtube.com{result['url_suffix']}`\n\n" |
|
await pro.edit_text(text, disable_web_page_preview=True) |
|
|