import os import time import requests from pyrogram.types import * from pyrogram import Client, filters from youtube_search import YoutubeSearch from yt_dlp import YoutubeDL from akn.utils.database import db from akn.utils.driver import YoutubeDriver from akn.utils.formatter import secs_to_mins from akn.utils.logger import LOGS from akn.utils.scripts import progress custom_loading = "πŸ—Ώ" COMMAND_LIST = """ /yta: Download the youtube link video in .mp3 format!. /ytv: Download the youtube link video in .mp4 format!. /ytsa: Download the youtube search video in .mp3 format!. /ytva: Download the youtube search video in .mp4 format! /ytlink: Search for a video on youtube """ async def input_user(message: Message) -> str: """Get the input from the user""" if len(message.command) < 2: output = "" else: try: output = message.text.split(" ", 1)[1].strip() or "" except IndexError: output = "" return output @Client.on_message( filters.command(["start"]) & ~filters.forwarded ) async def startbot(client: Client, message: Message): buttons = [ [ InlineKeyboardButton( text="Developer", url=f"https://t.me/xpushz" ), InlineKeyboardButton( text="Channel", url='https://t.me/RendyProjects' ), ] ] await message.reply_text( text=COMMAND_LIST, disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup(buttons) ) @Client.on_message( filters.command(["ytsa"]) & ~filters.forwarded ) async def youtube_search_audio(client: Client, message: Message): if len(message.command) < 2: return await message.reply_text( "Give a valid youtube search to download audio." ) query = await input_user(message) results = YoutubeSearch(query, max_results=5).to_dict() watch = results[0]["url_suffix"] url_suffix = watch.split("/")[1] okk = f"https://youtube.com/{url_suffix}" pro = await message.reply_text("Checking ...") _, url = YoutubeDriver.check_url(okk) #if not status: # return await pro.edit_text(url) if client.me.is_premium: await pro.edit_text(f"{custom_loading}__Downloading audio ...__") else: await pro.edit_text(f"__Downloading audio ...__") try: with YoutubeDL(YoutubeDriver.song_options()) as ytdl: yt_data = ytdl.extract_info(url, False) yt_file = ytdl.prepare_filename(yt_data) ytdl.process_info(yt_data) if client.me.is_premium: upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" else: upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" await pro.edit_text(upload_text) response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") with open(f"{yt_file}.jpg", "wb") as f: f.write(response.content) await message.reply_audio( f"{yt_file}.mp3", caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`", duration=int(yt_data["duration"]), performer="[Akeno UB]", title=yt_data["title"], thumb=f"{yt_file}.jpg", progress=progress, progress_args=( pro, time.time(), upload_text, ), ) await pro.delete() except Exception as e: return await pro.edit_text(f"**πŸ€ Audio not Downloaded:** `{e}`") try: os.remove(f"{yt_file}.jpg") os.remove(f"{yt_file}.mp3") except: pass @Client.on_message( filters.command(["yta"]) & ~filters.forwarded ) async def youtube_audio(client: Client, message: Message): if len(message.command) < 2: return await message.reply_text( "Give a valid youtube link to download audio." ) query = await input_user(message) pro = await message.reply_text("Checking ...") _, url = YoutubeDriver.check_url(query) #if not status: # return await pro.edit_text(url) if client.me.is_premium: await pro.edit_text(f"{custom_loading}__Downloading audio ...__") else: await pro.edit_text(f"__Downloading audio ...__") try: with YoutubeDL(YoutubeDriver.song_options()) as ytdl: yt_data = ytdl.extract_info(url, False) yt_file = ytdl.prepare_filename(yt_data) ytdl.process_info(yt_data) if client.me.is_premium: upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" else: upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" await pro.edit_text(upload_text) response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") with open(f"{yt_file}.jpg", "wb") as f: f.write(response.content) await message.reply_audio( f"{yt_file}.mp3", caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`", duration=int(yt_data["duration"]), performer="[Akeno UB]", title=yt_data["title"], thumb=f"{yt_file}.jpg", progress=progress, progress_args=( pro, time.time(), upload_text, ), ) await pro.delete() except Exception as e: return await pro.edit_text(f"**πŸ€ Audio not Downloaded:** `{e}`") try: os.remove(f"{yt_file}.jpg") os.remove(f"{yt_file}.mp3") except: pass @Client.on_message( filters.command(["ytva"]) & ~filters.forwarded ) async def ytvideo_search(client: Client, message: Message): if len(message.command) < 2: return await message.reply_text( "Give a valid youtube search to download video." ) query = await input_user(message) results = YoutubeSearch(query, max_results=5).to_dict() watch = results[0]["url_suffix"] url_suffix = watch.split("/")[1] okk = f"https://youtube.com/{url_suffix}" pro = await message.reply_text("Checking ...") _, url = YoutubeDriver.check_url(okk) #if not status: # return await pro.edit_text(url) if client.me.is_premium: await pro.edit_text(f"{custom_loading}__Downloading audio ...__") else: await pro.edit_text(f"__Downloading audio ...__") try: with YoutubeDL(YoutubeDriver.video_options()) as ytdl: yt_data = ytdl.extract_info(url, True) yt_file = yt_data["id"] if client.me.is_premium: upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" else: upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" await pro.edit_text(upload_text) response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") with open(f"{yt_file}.jpg", "wb") as f: f.write(response.content) await message.reply_video( f"{yt_file}.mp4", caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`", duration=int(yt_data["duration"]), thumb=f"{yt_file}.jpg", progress=progress, progress_args=( pro, time.time(), upload_text, ), ) await pro.delete() except Exception as e: return await pro.edit_text(f"**πŸ€ Video not Downloaded:** `{e}`") try: os.remove(f"{yt_file}.jpg") os.remove(f"{yt_file}.mp4") except: pass @Client.on_message( filters.command(["ytv"]) & ~filters.forwarded ) async def ytvideo(client: Client, message: Message): if len(message.command) < 2: return await message.reply_text( "Give a valid youtube link to download video." ) query = await input_user(message) pro = await message.reply_text("Checking ...") _, url = YoutubeDriver.check_url(query) #if not status: # return await pro.edit_text(url) if client.me.is_premium: await pro.edit_text(f"{custom_loading}__Downloading audio ...__") else: await pro.edit_text(f"__Downloading audio ...__") try: with YoutubeDL(YoutubeDriver.video_options()) as ytdl: yt_data = ytdl.extract_info(url, True) yt_file = yt_data["id"] if client.me.is_premium: upload_text = f"**{custom_loading}π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" else: upload_text = f"**π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" await pro.edit_text(upload_text) response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") with open(f"{yt_file}.jpg", "wb") as f: f.write(response.content) await message.reply_video( f"{yt_file}.mp4", caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`", duration=int(yt_data["duration"]), thumb=f"{yt_file}.jpg", progress=progress, progress_args=( pro, time.time(), upload_text, ), ) await pro.delete() except Exception as e: return await pro.edit_text(f"**πŸ€ Video not Downloaded:** `{e}`") try: os.remove(f"{yt_file}.jpg") os.remove(f"{yt_file}.mp4") except: pass @Client.on_message( filters.command(["ytlink"]) & ~filters.forwarded ) async def ytlink(_, message: Message): if len(message.command) < 2: return await message.reply_text("Give something to search on youtube.") query = await input_user(message) pro = await message.reply_text("Searching ...") try: results = YoutubeDriver(query, 7).to_dict() except Exception as e: return await pro.edit_text(f"**πŸ€ Error:** `{e}`") if not results: return await pro.edit_text("No results found.") text = f"**πŸ”Ž π–³π—ˆπ—π–Ίπ—… π–±π–Ύπ—Œπ—Žπ—…π—π—Œ π–₯π—ˆπ—Žπ—‡π–½:** `{len(results)}`\n\n" for result in results: text += f"**𝖳𝗂𝗍𝗅𝖾:** `{result['title'][:50]}`\n**𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{result['channel']}`\n**π–΅π—‚π–Ύπ—π—Œ:** `{result['views']}`\n**π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{result['duration']}`\n**𝖫𝗂𝗇𝗄:** `https://youtube.com{result['url_suffix']}`\n\n" await pro.edit_text(text, disable_web_page_preview=True)