import streamlit as st import libtorrent as lt import os import time import threading from pathlib import Path # 下载状态类 class DownloadStatus: def __init__(self): self.progress = 0.0 self.download_rate = 0.0 self.remaining_time = 0.0 self.total_size = 0 self.downloaded_size = 0 self.status = "等待中" self.file_path = None def human_readable_size(size): """转换文件大小到易读格式""" units = ['B', 'KB', 'MB', 'GB', 'TB'] index = 0 while size >= 1024 and index < 4: size /= 1024 index += 1 return f"{size:.2f} {units[index]}" def configure_session(): """兼容不同版本的会话配置""" try: # 新版配置方式 settings = lt.settings_pack() settings.set_str(lt.settings_pack.listen_interfaces, '0.0.0.0:6881') ses = lt.session(settings) except AttributeError: # 旧版配置方式 ses = lt.session() ses.listen_on(6881, 6891) return ses def parse_magnet_compat(magnet_uri): """兼容不同版本的磁力解析方法""" try: return lt.parse_magnet_uri(magnet_uri) except AttributeError: # 旧版处理方式 params = {'save_path': '.', 'storage_mode': lt.storage_mode_t(2)} return (magnet_uri, params) def download_task(magnet_uri, download_path, status): """后台下载任务""" try: ses = configure_session() status.status = "解析磁力链接..." # 兼容解析磁力链接 try: params = lt.parse_magnet_uri(magnet_uri) params.save_path = download_path params.storage_mode = lt.storage_mode_t.storage_mode_sparse handle = ses.add_torrent(params) except: # 旧版处理方式 handle = lt.add_magnet_uri(ses, magnet_uri, {'save_path': download_path}) status.status = "获取元数据..." # 等待元数据,带超时机制 timeout = 30 # 30秒超时 start = time.time() while not handle.has_metadata(): if time.time() - start > timeout: raise TimeoutError("获取元数据超时") time.sleep(0.5) try: ses.post_torrent_updates() # 新版状态更新 except: pass torrent_info = handle.get_torrent_info() status.total_size = torrent_info.total_size() status.status = "下载中..." # 主下载循环 start_time = time.time() last_downloaded = 0 while not handle.is_seed(): try: ses.post_torrent_updates() # 新版需要状态更新 except: pass s = handle.status() # 计算下载速度 now = time.time() dt = now - start_time status.downloaded_size = s.total_done status.download_rate = (s.total_done - last_downloaded) / dt if dt > 0 else 0 last_downloaded = s.total_done start_time = now # 计算进度和剩余时间 status.progress = s.progress if status.download_rate > 0: status.remaining_time = (status.total_size - s.total_done) / status.download_rate else: status.remaining_time = 0 time.sleep(1) status.status = "下载完成" status.file_path = os.path.join(download_path, handle.name()) except Exception as e: status.status = f"错误: {str(e)}" def main(): st.title("🕹️ 磁力链接下载器") # 初始化session状态 if 'download_status' not in st.session_state: st.session_state.download_status = DownloadStatus() # 输入区域 with st.form("magnet_form"): magnet_uri = st.text_input("磁力链接:", placeholder="magnet:?xt=urn:...") submitted = st.form_submit_button("开始下载") # 创建下载目录 download_path = Path("downloads") download_path.mkdir(exist_ok=True) if submitted and magnet_uri: if not magnet_uri.startswith("magnet:?"): st.error("无效的磁力链接格式") return # 启动下载线程 st.session_state.download_status = DownloadStatus() thread = threading.Thread( target=download_task, args=(magnet_uri, download_path, st.session_state.download_status) ) thread.start() # 实时状态显示 status = st.session_state.download_status status_container = st.empty() while status.status not in ["下载完成", "错误"] and status.status != "等待中": with status_container.container(): # 进度条 st.progress(status.progress, text=f"进度: {status.progress*100:.1f}%") # 下载信息 cols = st.columns(4) cols[0].metric("总大小", human_readable_size(status.total_size) if status.total_size > 0 else "--") cols[1].metric("已下载", human_readable_size(status.downloaded_size)) speed_display = (f"{status.download_rate/1024:.2f} MB/s" if status.download_rate > 1024 else f"{status.download_rate:.2f} KB/s") if status.download_rate > 0 else "--" cols[2].metric("速度", speed_display) time_display = f"{status.remaining_time:.1f}秒" if status.remaining_time > 0 else "--" cols[3].metric("剩余时间", time_display) st.caption(f"状态: {status.status}") time.sleep(0.5) # 下载完成后的处理 if status.status == "下载完成": st.success("🎉 下载完成!") with open(status.file_path, "rb") as f: st.download_button( label="保存文件", data=f, file_name=os.path.basename(status.file_path), mime="application/octet-stream" ) if __name__ == "__main__": main()