Spaces:
Running
Running
from telethon import events, utils, Button, TelegramClient | |
import asyncio, os, time, re, sys, threading, random, subprocess, json, base64, requests, shutil, zipfile | |
from bs4 import BeautifulSoup | |
from FastTelethon import download_file, upload_file | |
import subprocess | |
api_id = 9024532 | |
api_hash = "131b576240be107210aace99a5f5c5b0" | |
token = "7829277186:AAGZKtL06JxT5yp0FVNMjFy9yBKJiRpM-Y0" | |
client = TelegramClient("AutoD", api_id, api_hash) | |
class Timer: | |
def __init__(self, time_between=2): | |
self.start_time = time.time() | |
self.time_between = time_between | |
def can_send(self): | |
if time.time() > (self.start_time + self.time_between): | |
self.start_time = time.time() | |
return True | |
return False | |
def execute_command(command): | |
try: | |
result = subprocess.run(command, shell=True, capture_output=True, text=True) | |
if result.stdout: | |
return str(result.stdout) | |
if result.stderr: | |
return str(result.stderr) | |
except Exception as e: | |
return str(f"Error: {e}") | |
def folder_size(folder_path): | |
total_size = 0 | |
for dirpath, dirnames, filenames in os.walk(folder_path): | |
for f in filenames: | |
fp = os.path.join(dirpath, f) | |
total_size += os.path.getsize(fp) | |
return total_size | |
def split_ext(file=None, dir="./",size=3): | |
size = size * 1024 * 1024 | |
files = [] | |
tamano_archivo = os.path.getsize(file) | |
if tamano_archivo < size: | |
files.append(file) | |
else: | |
with open(file, 'rb') as archivo_original: | |
parte = 1 | |
totalparte = (tamano_archivo//size)+1 | |
while True: | |
contenido_parte = archivo_original.read(size) | |
if not contenido_parte: | |
break | |
nombre_parte = f"{file.split('/')[-1].split('.')[0]}.7z.{str(parte).zfill(3)}" | |
with open(dir+"/"+nombre_parte, 'wb') as archivo_parte: | |
archivo_parte.write(contenido_parte) | |
files.append(nombre_parte) | |
parte += 1 | |
os.unlink(file) | |
return files | |
def sizeof_fmt(num, suffix='B'): | |
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: | |
if abs(num) < 1024.0: | |
return "%3.2f%s%s" % (num, unit, suffix) | |
num /= 1024.0 | |
return "%.2f%s%s" % (num, 'Yi', suffix) | |
if not os.path.exists("./files"): | |
os.mkdir("./files") | |
client.start(bot_token=token) | |
users = [] | |
process = [] | |
basedata = {} | |
cdpaths = [] | |
async def mybot(event): | |
global basedata | |
global process | |
global cdpaths | |
if len(cdpaths) > 0: | |
cd = "/"+"/".join(cdpaths) | |
else: | |
cd = "/" | |
text = event.raw_text | |
user_id = event.message.chat.id | |
msg_id = event.message.id | |
user_name = event.message.chat.username | |
mode = "auto" | |
try: | |
mode = basedata[f"{user_id}_mode"] | |
except: | |
None | |
if len(users) == 0: | |
users.append(user_id) | |
if not user_id in users: | |
await event.reply("<b>¡Acceso no autorizado!</b>", parse_mode="HTML") | |
raise ValueError(f"({user_id}) No es un usuario autorizado") | |
if not os.path.exists(f"./files"): | |
os.mkdir(f"./files") | |
if not os.path.exists(f"./files_temp"): | |
os.mkdir(f"./files_temp") | |
directorio = f'./files'+cd | |
files = [(archivo, os.path.getmtime(os.path.join(directorio, archivo))) for archivo in os.listdir(directorio)] | |
file = sorted(files, key=lambda x: x, reverse=True) | |
files = [] | |
for f in file: | |
x,y = f | |
files.append(x) | |
if "/start" == text: | |
basedata[f"{user_id}_username"] = user_name | |
await event.reply(f"<code>AUTOD • TechDev</code>", parse_mode="HTML", buttons=[[Button.inline('?', b'')]]) | |
elif "/add " in text: | |
user = int(text.replace("/add", "")) | |
users.append(user) | |
await event.reply("Usuario añadido") | |
elif "/ban " in text: | |
user = int(text.replace("/ban", "")) | |
users.append(user) | |
await event.reply("Usuario borrado") | |
elif "/ls" in text: | |
msg = f"#LISTDIR #GOOD\n<code>{'/'.join(cdpaths)}</code>\n<b>LÍMITE - {sizeof_fmt(folder_size('./files'))}/100.0GiB</b>\n\n" | |
for f in files: | |
if os.path.isfile("./files"+cd+"/"+f): | |
msg += f" 📃 <code>{f}</code>\n" | |
else: | |
msg += f" 📁 <code>{f}</code>\n" | |
if len(files) == 0: | |
msg += "<i>No hay archivos</i>" | |
await event.reply(msg, parse_mode="HTML") | |
elif "/rm " in text: | |
path = text.replace("/rm ", "") | |
try: | |
if os.path.isfile("./files"+cd+"/"+path): | |
os.unlink("./files"+cd+"/"+path) | |
else: | |
shutil.rmtree("./files"+cd+"/"+path) | |
msg = "#REMOVE #GOOD\n\n<b>✓ ¡Directorio Eliminado!</b>" | |
except: | |
msg = "#REMOVE #ERROR\n\n<b>No existe el directorio</b>" | |
await event.reply(msg, parse_mode="HTML") | |
elif "/cd " in text: | |
path = text.replace("/cd ", "") | |
if not os.path.isdir("./files/"+cd+"/"+path): | |
cdpaths = [] | |
msg = "#CHANGEDIR #ERROR\n\n<b>No existe el directorio</b>" | |
else: | |
cdpaths = path.split("/") | |
msg = "#CHANGEDIR #GOOD\n\n<b>✓ ¡Directorio abierto!</b>" | |
directorio = f'./files/'+"/".join(cdpaths) | |
files = [(archivo, os.path.getmtime(os.path.join(directorio, archivo))) for archivo in os.listdir(directorio)] | |
file = sorted(files, key=lambda x: x, reverse=True) | |
files = [] | |
for f in file: | |
x,y = f | |
files.append(x) | |
msg = f"\n\n<code>{'/'.join(cdpaths)}</code>\n<b>LÍMITE - {sizeof_fmt(folder_size('./files'))}/100.0GiB</b>\n\n" | |
for f in files: | |
if os.path.isfile("./files"+cd+"/"+f): | |
msg += f" 📃 <code>{f}</code>\n" | |
else: | |
msg += f" 📁 <code>{f}</code>\n" | |
if len(files) == 0: | |
msg += "<i>No hay archivos</i>" | |
await event.reply(msg, parse_mode="HTML") | |
elif "/mkdir " in text: | |
dir = text.replace("/mkdir ", "") | |
os.mkdir("./files"+cd+"/"+dir) | |
msg = "#MAKEDIR #GOOD\n\n<b>✓ ¡DIRECTORIO CREADO!</b>" | |
cdpaths.append(dir) | |
directorio = f'./files/'+"/".join(cdpaths) | |
files = [(archivo, os.path.getmtime(os.path.join(directorio, archivo))) for archivo in os.listdir(directorio)] | |
file = sorted(files, key=lambda x: x, reverse=True) | |
files = [] | |
for f in file: | |
x,y = f | |
files.append(x) | |
msg = f"\n\n<code>{'/'.join(cdpaths)}</code>\n<b>LÍMITE - {sizeof_fmt(folder_size('./files'))}/100.0GiB</b>\n\n" | |
for f in files: | |
if os.path.isfile("./files"+cd+"/"+f): | |
msg += f" 📃 <code>{f}</code>\n" | |
else: | |
msg += f" 📁 <code>{f}</code>\n" | |
if len(files) == 0: | |
msg += "<i>No hay archivos</i>" | |
await event.reply(msg, parse_mode="HTML") | |
elif event.document and user_id in users: | |
code = str(random.randint(100000,999999)) | |
process.append(code) | |
while not code == process[0]: | |
await asyncio.sleep(3) | |
msging = "<b>PREPARANDO</b>" | |
msg = await event.reply(msging, parse_mode="HTML",buttons=[Button.inline("CANCELAR",b'del|'+code.encode('utf-8'))]) | |
await msg.edit("<b>DESCARGANDO</b>", parse_mode="HTML",buttons=[Button.inline("CANCELAR",b'del|'+code.encode('utf-8'))]) | |
basedata[code] = "down:0/0"+"|CARGANDO..." | |
basedata[code+"auto-act"] = True | |
caption = event.message.message | |
mime_type = str(event.media.document.mime_type) | |
extension = mime_type.split("/")[1] | |
if event.file.name: | |
name = event.file.name | |
else: | |
if caption: | |
name = str(caption).split("\n")[0]+"."+extension | |
else: | |
name = "document_"+str(time.strftime("%d_%H_%M_%S"))+"."+extension | |
if os.path.exists("./files/"+name): | |
name = name.replace(".", f"_{str(time.strftime('%d%H%M%S'))}.") | |
timer = Timer() | |
async def progress_bar(current, total): | |
if not code in process: | |
raise ValueError(f"Se detuvo el proceso {code}") | |
basedata[code] = "down:"+str(current)+"/"+str(total) | |
if basedata[code+"auto-act"]: | |
text1 = "<b>DESCARGANDO</b>" | |
bar = f"<i>{round((current/total)*100,1)}%</i> <b>[</b>" | |
prog = round(int(15*(current/total)),0) | |
deprog = 15-prog | |
bar += ("-"*prog)+"|" | |
bar += ("-"*deprog)+"<b>]</b>" | |
if len(process) > 1: | |
infopro = f"\n+{len(process)-1} procesos..." | |
else: | |
infopro = "" | |
if timer.can_send(): | |
await msg.edit(f"{text1}\n\n<b>Nombre:</b> <code>{name}</code>\n<b>Progreso:</b> {bar}\n<b>Tamaño:</b> <code>{sizeof_fmt(total)}</code>\n\n{infopro}", parse_mode="HTML", buttons=[Button.inline("CANCELAR", b'del|'+code.encode("utf8"))]) | |
with open("./files_temp/"+name, "wb") as out: | |
await download_file(event.client, event.document, out, progress_callback=progress_bar) | |
os.rename("./files_temp/"+name, "./files"+cd+"/"+name) | |
upmsg = await event.reply(f"#DOWNLOAD #GOOD\n\n<b>✓ ¡Proceso completado!</b>", parse_mode="HTML") | |
await client.delete_messages(user_id, msg.id) | |
basedata[code] = None | |
process.remove(code) | |
async def callback_query(event): | |
global basedata | |
global process | |
user_id = event.query.user_id | |
msg_id = event.query.msg_id | |
data = event.query.data.decode("utf-8") | |
orden = str(data).split("|")[0] | |
if orden == "del": | |
code = str(data).split("|")[1] | |
try: | |
process.remove(code) | |
except: | |
None | |
await client.delete_messages(user_id, msg_id) | |
print("CONECTADO") | |
client.run_until_disconnected() |