Spaces:
Running
Running
import streamlit as st | |
import libtorrent as lt | |
import os | |
import time | |
def download_magnet(magnet_uri, download_path): | |
# 创建会话 | |
ses = lt.session() | |
ses.listen_on(6881, 6891) | |
# 添加磁力链接 | |
params = { | |
'save_path': download_path, | |
'storage_mode': lt.storage_mode_t(2) | |
} | |
handle = lt.add_magnet_uri(ses, magnet_uri, params) | |
st.write(f"正在下载:{handle.name()}") | |
# 等待元数据下载完成 | |
while not handle.has_metadata(): | |
pass | |
# 获取文件信息 | |
file_info = handle.get_torrent_info() | |
total_size = file_info.total_size() | |
# 创建进度条 | |
progress_bar = st.progress(0) | |
# 下载文件 | |
start_time = time.time() | |
while not handle.is_seed(): | |
s = handle.status() | |
progress = s.total_done / total_size | |
download_rate = s.download_rate / 1000 # 转换为 KB/s | |
elapsed_time = time.time() - start_time | |
remaining_time = (total_size - s.total_done) / (s.download_rate or 1) | |
progress_bar.progress(progress) | |
st.write(f"下载进度:{progress * 100:.2f}% 下载速率:{download_rate:.2f} KB/s 剩余时间:{remaining_time:.2f} 秒") | |
time.sleep(1) | |
# 下载完成,返回文件路径 | |
return os.path.join(download_path, handle.name()) | |
def main(): | |
st.title("磁力链接下载器") | |
magnet_uri = st.text_input("请输入磁力链接:") | |
if magnet_uri: | |
download_path = os.path.join(os.getcwd(), "downloads") | |
if not os.path.exists(download_path): | |
os.makedirs(download_path) | |
file_path = download_magnet(magnet_uri, download_path) | |
st.write(f"下载完成:{file_path}") | |
with open(file_path, "rb") as f: | |
st.download_button( | |
label="下载文件", | |
data=f, | |
file_name=os.path.basename(file_path), | |
mime="application/octet-stream" | |
) | |
# 下载完成后,删除文件 | |
os.remove(file_path) | |
if __name__ == "__main__": | |
main() |