BT / app.py
Ethscriptions's picture
Update app.py
ceae804 verified
raw
history blame
2.06 kB
import streamlit as st
import libtorrent as lt
import os
import time
def download_magnet(magnet_uri, download_path):
# 创建会话
ses = lt.session()
ses.listen_on(6881, 6891)
# 添加磁力链接
params = {
'save_path': download_path,
'storage_mode': lt.storage_mode_t(2)
}
handle = lt.add_magnet_uri(ses, magnet_uri, params)
st.write(f"正在下载:{handle.name()}")
# 等待元数据下载完成
while not handle.has_metadata():
pass
# 获取文件信息
file_info = handle.get_torrent_info()
total_size = file_info.total_size()
# 创建进度条
progress_bar = st.progress(0)
# 下载文件
start_time = time.time()
while not handle.is_seed():
s = handle.status()
progress = s.total_done / total_size
download_rate = s.download_rate / 1000 # 转换为 KB/s
elapsed_time = time.time() - start_time
remaining_time = (total_size - s.total_done) / (s.download_rate or 1)
progress_bar.progress(progress)
st.write(f"下载进度:{progress * 100:.2f}% 下载速率:{download_rate:.2f} KB/s 剩余时间:{remaining_time:.2f} 秒")
time.sleep(1)
# 下载完成,返回文件路径
return os.path.join(download_path, handle.name())
def main():
st.title("磁力链接下载器")
magnet_uri = st.text_input("请输入磁力链接:")
if magnet_uri:
download_path = os.path.join(os.getcwd(), "downloads")
if not os.path.exists(download_path):
os.makedirs(download_path)
file_path = download_magnet(magnet_uri, download_path)
st.write(f"下载完成:{file_path}")
with open(file_path, "rb") as f:
st.download_button(
label="下载文件",
data=f,
file_name=os.path.basename(file_path),
mime="application/octet-stream"
)
# 下载完成后,删除文件
os.remove(file_path)
if __name__ == "__main__":
main()