Spaces:
Sleeping
Sleeping
File size: 5,365 Bytes
31d1f71 5c118c1 31d1f71 0582fea 31d1f71 7d040bf 31d1f71 5c118c1 7d040bf 5c118c1 7d040bf 5c118c1 7d040bf 5c118c1 0582fea 7d040bf 31d1f71 7d040bf 5c118c1 7d040bf 0582fea 7d040bf 5c118c1 7d040bf 5c118c1 7d040bf 5c118c1 31d1f71 7d040bf 5c118c1 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import streamlit as st
import aria2p # 改用更稳定的 aria2p 库
import time
import threading
import os
from pathlib import Path
# Aria2 配置
ARIA2_RPC_HOST = "http://localhost"
ARIA2_RPC_PORT = 6800
ARIA2_SECRET = "SECRET_KEY"
# 初始化 aria2 连接
def init_aria2():
try:
# 自动启动 aria2 后台进程
if not os.popen("pgrep aria2c").read():
download_dir = os.path.abspath("./downloads")
os.makedirs(download_dir, exist_ok=True)
cmd = [
"aria2c",
"--enable-rpc",
f"--rpc-listen-port={ARIA2_RPC_PORT}",
f"--rpc-secret={ARIA2_SECRET}",
f"--dir={download_dir}",
"--daemon=true",
"--max-concurrent-downloads=5",
"--split=16",
"--max-connection-per-server=16",
"--file-allocation=prealloc"
]
threading.Thread(target=lambda: os.system(" ".join(cmd))).start()
time.sleep(2)
# 初始化 API 客户端
return aria2p.Client(
host=ARIA2_RPC_HOST,
port=ARIA2_RPC_PORT,
secret=ARIA2_SECRET
)
except Exception as e:
st.error(f"初始化失败: {str(e)}")
raise
# Session 状态初始化
if 'aria_client' not in st.session_state:
st.session_state.aria_client = init_aria2()
if 'aria_api' not in st.session_state:
st.session_state.aria_api = aria2p.API(st.session_state.aria_client)
if 'download_progress' not in st.session_state:
st.session_state.download_progress = 0
if 'downloading' not in st.session_state:
st.session_state.downloading = False
if 'download_complete' not in st.session_state:
st.session_state.download_complete = False
if 'download_filename' not in st.session_state:
st.session_state.download_filename = None
if 'error_message' not in st.session_state:
st.session_state.error_message = None
def download_torrent(magnet_link):
try:
# 添加下载任务
download = st.session_state.aria_api.add_magnet(
magnet_link,
options={
"max-download-limit": "0",
"seed-time": "0",
"bt-detach-seed-only": "true"
}
)
st.session_state.error_message = None
st.session_state.downloading = True
# 进度监控
while download.is_active:
download.update()
st.session_state.download_progress = download.progress * 100
# 自动刷新间隔
time.sleep(1)
if download.is_complete:
st.session_state.download_complete = True
st.session_state.download_filename = download.files[0].path
else:
raise Exception(download.error_message)
except Exception as e:
st.session_state.error_message = f"下载失败: {str(e)}"
finally:
st.session_state.downloading = False
# 界面布局
st.title("磁力链接下载器 🧲 (Aria2版)")
with st.form("magnet_form"):
magnet_link = st.text_input("输入磁力链接:", placeholder="magnet:?xt=urn:btih:...")
submitted = st.form_submit_button("开始下载")
if submitted and magnet_link:
if st.session_state.downloading:
st.warning("当前已有下载任务在进行中")
else:
# 重置状态
st.session_state.download_progress = 0
st.session_state.download_complete = False
st.session_state.download_filename = None
st.session_state.error_message = None
st.session_state.downloading = True
start_download_thread(magnet_link)
if st.session_state.downloading:
st.info("下载状态")
progress_col, control_col = st.columns([4, 1])
with progress_col:
progress_bar = st.progress(int(st.session_state.download_progress))
st.write(f"当前进度:{st.session_state.download_progress:.1f}%")
with control_col:
if st.button("取消下载"):
st.session_state.downloading = False
st.session_state.download_complete = False
st.session_state.error_message = "下载已取消"
st.experimental_rerun()
if st.session_state.error_message:
st.error(st.session_state.error_message)
if st.session_state.download_complete:
st.success("下载完成!✅")
if st.session_state.download_filename and Path(st.session_state.download_filename).exists():
file_path = Path(st.session_state.download_filename)
file_size = file_path.stat().st_size
st.write(f"文件大小:{file_size/1024/1024:.2f} MB")
with open(file_path, "rb") as f:
st.download_button(
label="下载文件",
data=f,
file_name=file_path.name,
mime="application/octet-stream"
)
else:
st.error("文件不存在,可能下载失败")
# 注意事项
st.markdown("---")
st.info("""
**使用说明:**
1. 确保已安装 aria2:`brew install aria2` (macOS) 或 `sudo apt install aria2` (Linux)
2. 首次运行会自动启动 aria2 后台服务
3. 下载文件将保存到当前目录的 downloads 文件夹
""") |