import streamlit as st import aria2p # 改用更稳定的 aria2p 库 import time import threading import os from pathlib import Path # Aria2 配置 ARIA2_RPC_HOST = "http://localhost" ARIA2_RPC_PORT = 6800 ARIA2_SECRET = "SECRET_KEY" # 初始化 aria2 连接 def init_aria2(): try: # 自动启动 aria2 后台进程 if not os.popen("pgrep aria2c").read(): download_dir = os.path.abspath("./downloads") os.makedirs(download_dir, exist_ok=True) cmd = [ "aria2c", "--enable-rpc", f"--rpc-listen-port={ARIA2_RPC_PORT}", f"--rpc-secret={ARIA2_SECRET}", f"--dir={download_dir}", "--daemon=true", "--max-concurrent-downloads=5", "--split=16", "--max-connection-per-server=16", "--file-allocation=prealloc" ] threading.Thread(target=lambda: os.system(" ".join(cmd))).start() time.sleep(2) # 初始化 API 客户端 return aria2p.Client( host=ARIA2_RPC_HOST, port=ARIA2_RPC_PORT, secret=ARIA2_SECRET ) except Exception as e: st.error(f"初始化失败: {str(e)}") raise # Session 状态初始化 if 'aria_client' not in st.session_state: st.session_state.aria_client = init_aria2() if 'aria_api' not in st.session_state: st.session_state.aria_api = aria2p.API(st.session_state.aria_client) if 'download_progress' not in st.session_state: st.session_state.download_progress = 0 if 'downloading' not in st.session_state: st.session_state.downloading = False if 'download_complete' not in st.session_state: st.session_state.download_complete = False if 'download_filename' not in st.session_state: st.session_state.download_filename = None if 'error_message' not in st.session_state: st.session_state.error_message = None def download_torrent(magnet_link): try: # 添加下载任务 download = st.session_state.aria_api.add_magnet( magnet_link, options={ "max-download-limit": "0", "seed-time": "0", "bt-detach-seed-only": "true" } ) st.session_state.error_message = None st.session_state.downloading = True # 进度监控 while download.is_active: download.update() st.session_state.download_progress = download.progress * 100 # 自动刷新间隔 time.sleep(1) if download.is_complete: st.session_state.download_complete = True st.session_state.download_filename = download.files[0].path else: raise Exception(download.error_message) except Exception as e: st.session_state.error_message = f"下载失败: {str(e)}" finally: st.session_state.downloading = False # 界面布局 st.title("磁力链接下载器 🧲 (Aria2版)") with st.form("magnet_form"): magnet_link = st.text_input("输入磁力链接:", placeholder="magnet:?xt=urn:btih:...") submitted = st.form_submit_button("开始下载") if submitted and magnet_link: if st.session_state.downloading: st.warning("当前已有下载任务在进行中") else: # 重置状态 st.session_state.download_progress = 0 st.session_state.download_complete = False st.session_state.download_filename = None st.session_state.error_message = None st.session_state.downloading = True start_download_thread(magnet_link) if st.session_state.downloading: st.info("下载状态") progress_col, control_col = st.columns([4, 1]) with progress_col: progress_bar = st.progress(int(st.session_state.download_progress)) st.write(f"当前进度:{st.session_state.download_progress:.1f}%") with control_col: if st.button("取消下载"): st.session_state.downloading = False st.session_state.download_complete = False st.session_state.error_message = "下载已取消" st.experimental_rerun() if st.session_state.error_message: st.error(st.session_state.error_message) if st.session_state.download_complete: st.success("下载完成!✅") if st.session_state.download_filename and Path(st.session_state.download_filename).exists(): file_path = Path(st.session_state.download_filename) file_size = file_path.stat().st_size st.write(f"文件大小:{file_size/1024/1024:.2f} MB") with open(file_path, "rb") as f: st.download_button( label="下载文件", data=f, file_name=file_path.name, mime="application/octet-stream" ) else: st.error("文件不存在,可能下载失败") # 注意事项 st.markdown("---") st.info(""" **使用说明:** 1. 确保已安装 aria2:`brew install aria2` (macOS) 或 `sudo apt install aria2` (Linux) 2. 首次运行会自动启动 aria2 后台服务 3. 下载文件将保存到当前目录的 downloads 文件夹 """)