|
import os |
|
import time |
|
|
|
import requests |
|
from pyrogram.types import Message |
|
from pyrogram import Client, filters |
|
from youtube_search import YoutubeSearch |
|
from yt_dlp import YoutubeDL |
|
|
|
from akn import log_detailed_error |
|
from akn.utils.database import db |
|
from akn.utils.driver import YoutubeDriver |
|
from akn.utils.formatter import secs_to_mins |
|
from akn.utils.handler import * |
|
from akn.utils.logger import LOGS |
|
from akn.utils.scripts import progress |
|
from akn.utils.prefixprem import command |
|
from config import * |
|
|
|
custom_loading = "<emoji id=5974235702701853774>πΏ</emoji>" |
|
|
|
@Akeno( |
|
~filters.scheduled |
|
& command(["ytsa"]) |
|
& filters.me |
|
& ~filters.forwarded |
|
) |
|
async def youtube_search_audio(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube search to download audio." |
|
) |
|
query = await input_user(message) |
|
results = YoutubeSearch(query, max_results=5).to_dict() |
|
watch = results[0]["url_suffix"] |
|
url_suffix = watch.split("/")[1] |
|
okk = f"https://youtube.com/{url_suffix}" |
|
pro = await message.reply_text("Checking ...") |
|
status, url = YoutubeDriver.check_url(okk) |
|
if not status: |
|
return await pro.edit_text(url) |
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.song_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, False) |
|
yt_file = ytdl.prepare_filename(yt_data) |
|
ytdl.process_info(yt_data) |
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_audio( |
|
f"{yt_file}.mp3", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
performer="[Akeno UB]", |
|
title=yt_data["title"], |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
await log_detailed_error(e, where=client.me.id, who=message.chat.title) |
|
return await pro.edit_text(f"**π Audio not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp3") |
|
except: |
|
pass |
|
|
|
@Akeno( |
|
~filters.scheduled |
|
& command(["yta"]) |
|
& filters.me |
|
& ~filters.forwarded |
|
) |
|
async def youtube_audio(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube link to download audio." |
|
) |
|
query = await input_user(message) |
|
pro = await message.reply_text("Checking ...") |
|
status, url = YoutubeDriver.check_url(query) |
|
if not status: |
|
return await pro.edit_text(url) |
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.song_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, False) |
|
yt_file = ytdl.prepare_filename(yt_data) |
|
ytdl.process_info(yt_data) |
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_audio( |
|
f"{yt_file}.mp3", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
performer="[Akeno UB]", |
|
title=yt_data["title"], |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
await log_detailed_error(e, where=client.me.id, who=message.chat.title) |
|
return await pro.edit_text(f"**π Audio not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp3") |
|
except: |
|
pass |
|
|
|
@Akeno( |
|
~filters.scheduled |
|
& command(["ytva"]) |
|
& filters.me |
|
& ~filters.forwarded |
|
) |
|
async def ytvideo_search(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube search to download video." |
|
) |
|
query = await input_user(message) |
|
results = YoutubeSearch(query, max_results=5).to_dict() |
|
watch = results[0]["url_suffix"] |
|
url_suffix = watch.split("/")[1] |
|
okk = f"https://youtube.com/{url_suffix}" |
|
pro = await message.reply_text("Checking ...") |
|
status, url = YoutubeDriver.check_url(okk) |
|
if not status: |
|
return await pro.edit_text(url) |
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.video_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, True) |
|
yt_file = yt_data["id"] |
|
|
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_video( |
|
f"{yt_file}.mp4", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
await log_detailed_error(e, where=client.me.id, who=message.chat.title) |
|
return await pro.edit_text(f"**π Video not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp4") |
|
except: |
|
pass |
|
|
|
@Akeno( |
|
~filters.scheduled |
|
& command(["ytv"]) |
|
& filters.me |
|
& ~filters.forwarded |
|
) |
|
async def ytvideo(client: Client, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text( |
|
"Give a valid youtube link to download video." |
|
) |
|
query = await input_user(message) |
|
pro = await message.reply_text("Checking ...") |
|
status, url = YoutubeDriver.check_url(query) |
|
if not status: |
|
return await pro.edit_text(url) |
|
if client.me.is_premium: |
|
await pro.edit_text(f"{custom_loading}__Downloading audio ...__") |
|
else: |
|
await pro.edit_text(f"__Downloading audio ...__") |
|
try: |
|
with YoutubeDL(YoutubeDriver.video_options()) as ytdl: |
|
yt_data = ytdl.extract_info(url, True) |
|
yt_file = yt_data["id"] |
|
|
|
if client.me.is_premium: |
|
upload_text = f"**{custom_loading}π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
else: |
|
upload_text = f"**π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**π’ππΊπππΎπ
:** `{yt_data['channel']}`" |
|
await pro.edit_text(upload_text) |
|
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") |
|
with open(f"{yt_file}.jpg", "wb") as f: |
|
f.write(response.content) |
|
await message.reply_video( |
|
f"{yt_file}.mp4", |
|
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", |
|
duration=int(yt_data["duration"]), |
|
thumb=f"{yt_file}.jpg", |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text, |
|
), |
|
) |
|
await pro.delete() |
|
except Exception as e: |
|
await log_detailed_error(e, where=client.me.id, who=message.chat.title) |
|
return await pro.edit_text(f"**π Video not Downloaded:** `{e}`") |
|
try: |
|
os.remove(f"{yt_file}.jpg") |
|
os.remove(f"{yt_file}.mp4") |
|
except: |
|
pass |
|
|
|
@Akeno( |
|
~filters.scheduled |
|
& command(["ytlink"]) |
|
& filters.me |
|
& ~filters.forwarded |
|
) |
|
async def ytlink(_, message: Message): |
|
if len(message.command) < 2: |
|
return await message.reply_text("Give something to search on youtube.") |
|
query = await input_user(message) |
|
pro = await message.reply_text("Searching ...") |
|
try: |
|
results = YoutubeDriver(query, 7).to_dict() |
|
except Exception as e: |
|
await log_detailed_error(e, where=client.me.id, who=message.chat.title) |
|
return await pro.edit_text(f"**π Error:** `{e}`") |
|
if not results: |
|
return await pro.edit_text("No results found.") |
|
text = f"**π π³πππΊπ
π±πΎπππ
ππ π₯ππππ½:** `{len(results)}`\n\n" |
|
for result in results: |
|
text += f"**π³πππ
πΎ:** `{result['title'][:50]}`\n**π’ππΊπππΎπ
:** `{result['channel']}`\n**π΅ππΎππ:** `{result['views']}`\n**π£πππΊππππ:** `{result['duration']}`\n**π«πππ:** `https://youtube.com{result['url_suffix']}`\n\n" |
|
await pro.edit_text(text, disable_web_page_preview=True) |
|
|
|
module = modules_help.add_module("youtube", __file__) |
|
module.add_command("yta", "Download the youtube link video in .mp3 format!.") |
|
module.add_command("ytv", "Download the youtube link video in .mp3 format!.") |
|
module.add_command("ytsa", "Download the youtube search video in .mp3 format!.") |
|
module.add_command("ytva", "Download the youtube search video in .mp4 format!") |
|
module.add_command("ytlink", "Search for a video on youtube") |