import re from pyrogram import * from pyrogram.types import * from pyrogram.errors import * from pyrogram.enums import * from akn.AllDownloaderBot.support_sites import * from akn.utils.logger import * from akn.utils.database import db def is_video_url(url: str) -> bool: patterns = ( r"tiktok\.com/(?:@|v/|t/|video/)", r"(?:youtube\.com/watch\?v=|youtu\.be/)[\w-]{11}", r"instagram\.com/(?:p|reel)/[\w-]+", r"(?:twitter\.com|x\.com)/\w+/status/\d+", r"facebook\.com/(?:watch|reel|share)/[\w-]+", ) return any(re.search(pattern, url, re.IGNORECASE) for pattern in patterns) @Client.on_message( ~filters.scheduled & filters.command(["alldl"]) & ~filters.forwarded ) async def allone_downloader(client: Client, message: Message): user_id = message.from_user.id if await db.get_alldlbot_is_blacklist_user(client.me.id, user_id): return if message.chat.type.value != "supergroup": return await message.reply_text("You can only group public not private") await db.update_alldlbot_broadcast_in_group(client.me.id, message.chat.id, "add") user_name = message.from_user.first_name nn = "[" + user_name + "](tg://user?id=" + str(user_id) + ")" if not await db.get_forcesub_only_bot(client.me.id): return await message.reply_text("This is the command /addjoin username channel and becomes admin on the channel") update_channel = await db.get_forcesub_only_bot(client.me.id) if update_channel: try: user = await client.get_chat_member(update_channel, user_id) if user.status == ChatMemberStatus.BANNED: await client.send_message( message.chat.id, text=f"**❌ {nn} anda telah di blokir dari grup dukungan**", disable_web_page_preview=True, ) return except UserNotParticipant: await client.send_message( message.chat.id, text=f"**👋🏻 Halo {nn}\nᴜɴᴛᴜᴋ ᴍᴇɴɢʜɪɴᴅᴀʀɪ ᴘᴇɴɢɢᴜɴᴀᴀɴ ʏᴀɴɢ ʙᴇʀʟᴇʙɪʜᴀɴ ʙᴏᴛ ɪɴɪ ᴅɪ ᴋʜᴜsᴜsᴋᴀɴ ᴜɴᴛᴜᴋ ʏᴀɴɢ sᴜᴅᴀʜ ᴊᴏɪɴ ᴅɪ ᴄʜᴀɴɴᴇʟ ᴋᴀᴍɪ​!**", reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( "Jᴏɪɴ Cʜᴀɴɴᴇʟ Dᴜʟᴜ", url=f"https://t.me/{update_channel}", ) ] ] ), ) except ChatAdminRequired: return await message.reply_text("Chat admins Required: before /addjoin username without @ and add bot to your channel as admin") query_url = message.text.split(" ", 1)[1] if len(message.command) > 1 else None if not query_url: return await message.reply_text("?") if not is_video_url(query_url): return await message.reply_text("Invalid link") dll = await message.reply_text("Processing....") try: result, caption = all_one_downloader(query_url) if result: await message.reply_video( result, caption=caption, disable_notification=True ) await dll.delete() return else: return await message.reply_text("Try again") except Exception as e: await dll.delete() LOGS.error(f"Error yt: {type(e).__name__}") await message.reply_text(f"Free: Upgrade to premium only cookies")