t / main.py
bot
完善分享功能
f20376d
raw
history blame
8 kB
import os
import asyncio
import json
import logging
import httpx
from pikpakapi import PikPakApi
from typing import Union, Any, Dict, List, Optional
from fastapi import (
FastAPI,
APIRouter,
Depends,
Request,
Body,
Response,
HTTPException,
status,
Request,
)
from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Extra
class PostRequest(BaseModel):
class Config:
extra = Extra.allow
class FileRequest(BaseModel):
size: int = 100
parent_id: str | None = ""
next_page_token: str | None = ""
additional_filters: Dict | None = {}
class Config:
extra = Extra.allow
class OfflineRequest(BaseModel):
file_url: str = ""
parent_id: str | None = ""
name: str | None = ""
class Config:
extra = Extra.allow
security = HTTPBearer()
# SECRET_TOKEN = "SECRET_TOKEN"
SECRET_TOKEN = os.getenv("SECRET_TOKEN")
if SECRET_TOKEN is None:
raise ValueError("请在环境变量中设置SECRET_TOKEN,确保安全!")
THUNDERX_USERNAME = os.getenv("THUNDERX_USERNAME")
if THUNDERX_USERNAME is None:
raise ValueError("请在环境变量中设置THUNDERX_USERNAME,用户名【邮箱】用来登陆!")
THUNDERX_PASSWORD = os.getenv("THUNDERX_PASSWORD")
if THUNDERX_PASSWORD is None:
raise ValueError("请在环境变量中设置THUNDERX_PASSWORD,密码用来登陆!")
PROXY_URL = os.getenv("PROXY_URL")
async def verify_token(
request: Request, credentials: HTTPAuthorizationCredentials = Depends(security)
):
# excluded_paths = ["/"] # 需要排除的路径列表
# if request.url.path in excluded_paths:
# return # 直接跳过验证
# 验证Bearer格式
if credentials.scheme != "Bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication scheme",
)
# 验证令牌内容
if credentials.credentials != SECRET_TOKEN:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token"
)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
api_router = APIRouter(dependencies=[Depends(verify_token)])
front_router = APIRouter()
templates = Jinja2Templates(
directory="templates", variable_start_string="{[", variable_end_string="]}"
)
THUNDERX_CLIENT = None
async def log_token(THUNDERX_CLIENT, extra_data):
logging.info(f"Token: {THUNDERX_CLIENT.encoded_token}, Extra Data: {extra_data}")
@app.on_event("startup")
async def init_client():
global THUNDERX_CLIENT
if not os.path.exists("thunderx.txt"):
THUNDERX_CLIENT = PikPakApi(
username=THUNDERX_USERNAME,
password=THUNDERX_PASSWORD,
httpx_client_args=None,
token_refresh_callback=log_token,
token_refresh_callback_kwargs={"extra_data": "test"},
)
await THUNDERX_CLIENT.login()
await THUNDERX_CLIENT.refresh_access_token()
with open("thunderx.json", "w") as f:
f.write(json.dumps(THUNDERX_CLIENT.to_dict(), indent=4))
else:
with open("thunderx.txt", "r") as f:
data = json.load(f)
THUNDERX_CLIENT = PikPakApi.from_dict(data)
# await client.refresh_access_token()
print(json.dumps(THUNDERX_CLIENT.get_user_info(), indent=4))
print(
json.dumps(
await THUNDERX_CLIENT.events(),
indent=4,
)
)
@front_router.get(
"/",
response_class=HTMLResponse,
summary="前台页面",
description="前台管理页面,需要在设置里设置SECRET_TOKEN才能正常请求",
tags=["前端"],
)
async def home(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@api_router.post(
"/files", summary="文件列表", description="获取文件列表", tags=["文件"]
)
async def get_files(item: FileRequest):
return await THUNDERX_CLIENT.file_list(
item.size, item.parent_id, item.next_page_token, item.additional_filters
)
@api_router.get(
"/files/{file_id}", summary="文件信息", description="获取文件信息", tags=["文件"]
)
async def get_file_info(file_id: str):
return await THUNDERX_CLIENT.get_download_url(file_id)
@api_router.post(
"/emptytrash", summary="清空回收站", description="清空回收站【慎用】", tags=["文件"]
)
async def emptytrash():
return await THUNDERX_CLIENT.emptytrash()
############## 分享 ################
@api_router.post(
"/get_share_list",
summary="获取账号分享列表",
description="获取账号分享列表",
tags=["分享"],
)
async def get_share_list(page_token: str | None = None):
return await THUNDERX_CLIENT.get_share_list(page_token)
@api_router.post(
"/file_batch_share", summary="创建分享", description="创建分享", tags=["分享"]
)
async def file_batch_share(
ids: List[str] = None,
need_password: bool | None = False,
expiration_days: int | None = -1,
):
return await THUNDERX_CLIENT.file_batch_share(ids, need_password, expiration_days)
@api_router.post(
"/file_batch_delete", summary="取消分享", description="取消分享", tags=["分享"]
)
async def file_batch_delete(ids: List[str]):
return await THUNDERX_CLIENT.file_batch_delete(ids)
@api_router.post(
"/get_share_folder",
summary="获取分享信息",
description="获取分享信息",
tags=["分享"],
)
async def get_share_folder(
share_id: str, pass_code_token: str | None = None, parent_id: str | None = None
):
return await THUNDERX_CLIENT.get_share_folder(share_id, pass_code_token, parent_id)
@api_router.post(
"/restore", summary="转存分享文件", description="转存分享文件", tags=["分享"]
)
async def restore(
share_id: str, pass_code_token: str | None = None, file_ids: List[str] | None = None
):
return await THUNDERX_CLIENT.restore(share_id, pass_code_token, file_ids)
############## 离线任务 ################
@api_router.get(
"/offline", summary="离线任务列表", description="离线任务列表", tags=["离线任务"]
)
async def offline_list(size: int = 10000, next_page_token: str | None = None):
return await THUNDERX_CLIENT.offline_list(
size=size,
next_page_token=next_page_token,
phase=None,
)
@api_router.post(
"/offline", summary="添加离线任务", description="添加离线任务", tags=["离线任务"]
)
async def offline(item: OfflineRequest):
return await THUNDERX_CLIENT.offline_download(
item.file_url, item.parent_id, item.name
)
@api_router.post(
"/delete_tasks",
summary="删除离线任务",
description="删除离线任务",
tags=["离线任务"],
)
async def delete_tasks(task_ids: List[str], delete_files: bool = False):
return await THUNDERX_CLIENT.delete_tasks(task_ids, delete_files)
############## 账号 ################
@api_router.get(
"/userinfo", summary="用户信息", description="获取用户登陆信息", tags=["账号"]
)
async def userinfo():
return THUNDERX_CLIENT.get_user_info()
@api_router.get(
"/quota", summary="空间使用信息", description="获取空间使用信息", tags=["账号"]
)
async def quota_info():
return await THUNDERX_CLIENT.get_quota_info()
@api_router.get(
"/invite_code", summary="查看邀请码", description="查看邀请码", tags=["账号"]
)
async def get_invite_code():
return await THUNDERX_CLIENT.get_invite_code()
app.include_router(front_router)
app.include_router(api_router)