Spaces:
Sleeping
Sleeping
import streamlit as st | |
import libtorrent as lt | |
import time | |
import os | |
import tempfile | |
def download_magnet_link(magnet_link): | |
""" | |
使用libtorrent下载磁力链接,并显示下载进度。 | |
参数: | |
magnet_link (str): 磁力链接 | |
返回: | |
str: 下载文件的本地路径,如果下载失败则返回 None | |
""" | |
try: | |
ses = lt.session() | |
ses.listen_on(6881, 6891) # 监听端口范围,用于BT下载 | |
params = lt.parse_magnet_uri(magnet_link) | |
handle = ses.add_torrent(params) | |
progress_bar = st.empty() # 创建一个空的占位符,用于显示进度条 | |
progress_text = st.empty() # 创建一个空的占位符,用于显示进度文字 | |
status = handle.status() | |
while not status.has_metadata(): # 等待获取 torrent 元数据 | |
status = handle.status() | |
time.sleep(1) | |
torrent_info = handle.get_torrent_info() | |
file_name = torrent_info.files()[0].path # 假设只下载第一个文件,您可以根据需求修改逻辑 | |
temp_dir = tempfile.TemporaryDirectory() # 创建临时目录用于存放下载文件 | |
download_path = os.path.join(temp_dir.name, file_name) # 拼接下载文件的完整路径 | |
handle.move_storage(temp_dir.name) # 设置下载文件的存储路径为临时目录 | |
while status.state != lt.torrent_status.seeding: # 循环直到下载完成 (seeding 状态) | |
status = handle.status() | |
progress_percent = int(status.progress * 100) # 计算下载进度百分比 | |
progress_bar.progress(progress_percent / 100) # 更新进度条 | |
progress_text.text(f"下载进度: {progress_percent}% - 速度: {status.download_payload_rate / 1000:.2f} KB/s - 已下载: {status.total_done / (1024 * 1024):.2f} MB / 总大小: {status.total / (1024 * 1024):.2f} MB") # 更新进度文字 | |
time.sleep(0.5) # 每0.5秒更新一次进度 | |
ses.remove_torrent(handle) # 下载完成后移除 torrent 任务 | |
ses.pause() # 暂停 libtorrent 会话 | |
return download_path # 返回下载文件的完整路径 | |
except Exception as e: | |
st.error(f"下载失败: {e}") # 显示错误信息 | |
return None | |
st.title("磁力链接下载器") # Streamlit 应用标题 | |
magnet_link_input = st.text_input("请输入磁力链接:") # 用户输入磁力链接的输入框 | |
if magnet_link_input: # 当用户输入磁力链接后 | |
download_path = download_magnet_link(magnet_link_input) # 调用下载函数 | |
if download_path: # 如果下载成功 | |
st.success("下载完成!文件已保存在临时目录,您可以下载文件。") # 显示下载成功信息 | |
try: | |
with open(download_path, "rb") as f: # 以二进制读取下载的文件 | |
file_data = f.read() # 读取文件内容 | |
file_name = os.path.basename(download_path) # 获取文件名 | |
st.download_button( # 创建下载按钮 | |
label="下载文件", # 按钮标签 | |
data=file_data, # 下载的数据,即文件内容 | |
file_name=file_name, # 下载的文件名 | |
mime="application/octet-stream" # 设置MIME类型,通用二进制流 | |
) | |
except Exception as e: | |
st.error(f"无法读取文件或创建下载链接: {e}") # 显示文件读取或下载链接创建失败的错误信息 | |