|
import os |
|
import asyncio |
|
import json |
|
import logging |
|
|
|
import httpx |
|
|
|
from pikpakapi import PikPakApi |
|
|
|
from typing import Union, Any, Dict, List, Optional |
|
from fastapi import ( |
|
FastAPI, |
|
APIRouter, |
|
Depends, |
|
Request, |
|
Body, |
|
Response, |
|
HTTPException, |
|
status, |
|
Request, |
|
) |
|
from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse |
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
from fastapi.templating import Jinja2Templates |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel, Extra |
|
|
|
|
|
class PostRequest(BaseModel): |
|
class Config: |
|
extra = Extra.allow |
|
|
|
|
|
class FileRequest(BaseModel): |
|
size: int = 100 |
|
parent_id: str | None = "" |
|
next_page_token: str | None = "" |
|
additional_filters: Dict | None = {} |
|
|
|
class Config: |
|
extra = Extra.allow |
|
|
|
|
|
class OfflineRequest(BaseModel): |
|
file_url: str = "" |
|
parent_id: str | None = "" |
|
name: str | None = "" |
|
|
|
class Config: |
|
extra = Extra.allow |
|
|
|
|
|
security = HTTPBearer() |
|
|
|
SECRET_TOKEN = os.getenv("SECRET_TOKEN") |
|
if SECRET_TOKEN is None: |
|
raise ValueError("请在环境变量中设置SECRET_TOKEN,确保安全!") |
|
|
|
THUNDERX_USERNAME = os.getenv("THUNDERX_USERNAME") |
|
if THUNDERX_USERNAME is None: |
|
raise ValueError("请在环境变量中设置THUNDERX_USERNAME,用户名【邮箱】用来登陆!") |
|
|
|
|
|
THUNDERX_PASSWORD = os.getenv("THUNDERX_PASSWORD") |
|
if THUNDERX_PASSWORD is None: |
|
raise ValueError("请在环境变量中设置THUNDERX_PASSWORD,密码用来登陆!") |
|
|
|
PROXY_URL = os.getenv("PROXY_URL") |
|
|
|
|
|
async def verify_token( |
|
request: Request, credentials: HTTPAuthorizationCredentials = Depends(security) |
|
): |
|
|
|
|
|
|
|
|
|
|
|
if credentials.scheme != "Bearer": |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Invalid authentication scheme", |
|
) |
|
|
|
|
|
if credentials.credentials != SECRET_TOKEN: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" |
|
) |
|
|
|
|
|
app = FastAPI() |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
api_router = APIRouter(dependencies=[Depends(verify_token)]) |
|
front_router = APIRouter() |
|
|
|
templates = Jinja2Templates( |
|
directory="templates", variable_start_string="{[", variable_end_string="]}" |
|
) |
|
|
|
THUNDERX_CLIENT = None |
|
|
|
|
|
async def log_token(THUNDERX_CLIENT, extra_data): |
|
logging.info(f"Token: {THUNDERX_CLIENT.encoded_token}, Extra Data: {extra_data}") |
|
|
|
|
|
@app.on_event("startup") |
|
async def init_client(): |
|
global THUNDERX_CLIENT |
|
if not os.path.exists("thunderx.txt"): |
|
THUNDERX_CLIENT = PikPakApi( |
|
username=THUNDERX_USERNAME, |
|
password=THUNDERX_PASSWORD, |
|
httpx_client_args=None, |
|
token_refresh_callback=log_token, |
|
token_refresh_callback_kwargs={"extra_data": "test"}, |
|
) |
|
await THUNDERX_CLIENT.login() |
|
await THUNDERX_CLIENT.refresh_access_token() |
|
with open("thunderx.json", "w") as f: |
|
f.write(json.dumps(THUNDERX_CLIENT.to_dict(), indent=4)) |
|
else: |
|
with open("thunderx.txt", "r") as f: |
|
data = json.load(f) |
|
THUNDERX_CLIENT = PikPakApi.from_dict(data) |
|
|
|
print(json.dumps(THUNDERX_CLIENT.get_user_info(), indent=4)) |
|
|
|
print( |
|
json.dumps( |
|
await THUNDERX_CLIENT.events(), |
|
indent=4, |
|
) |
|
) |
|
|
|
|
|
@front_router.get( |
|
"/", |
|
response_class=HTMLResponse, |
|
summary="前台页面", |
|
description="前台管理页面,需要在设置里设置SECRET_TOKEN才能正常请求", |
|
tags=["前端"], |
|
) |
|
async def home(request: Request): |
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@api_router.post( |
|
"/files", summary="文件列表", description="获取文件列表", tags=["文件"] |
|
) |
|
async def get_files(item: FileRequest): |
|
return await THUNDERX_CLIENT.file_list( |
|
item.size, item.parent_id, item.next_page_token, item.additional_filters |
|
) |
|
|
|
|
|
@api_router.get( |
|
"/files/{file_id}", summary="文件信息", description="获取文件信息", tags=["文件"] |
|
) |
|
async def get_file_info(file_id: str): |
|
return await THUNDERX_CLIENT.get_download_url(file_id) |
|
|
|
|
|
@api_router.post( |
|
"/emptytrash", summary="清空回收站", description="清空回收站【慎用】", tags=["文件"] |
|
) |
|
async def emptytrash(): |
|
return await THUNDERX_CLIENT.emptytrash() |
|
|
|
|
|
|
|
@api_router.post( |
|
"/get_share_list", |
|
summary="获取账号分享列表", |
|
description="获取账号分享列表", |
|
tags=["分享"], |
|
) |
|
async def get_share_list(page_token: str | None = None): |
|
return await THUNDERX_CLIENT.get_share_list(page_token) |
|
|
|
|
|
@api_router.post( |
|
"/file_batch_share", summary="创建分享", description="创建分享", tags=["分享"] |
|
) |
|
async def file_batch_share( |
|
ids: List[str] = None, |
|
need_password: bool | None = False, |
|
expiration_days: int | None = -1, |
|
): |
|
return await THUNDERX_CLIENT.file_batch_share(ids, need_password, expiration_days) |
|
|
|
|
|
@api_router.post( |
|
"/file_batch_delete", summary="取消分享", description="取消分享", tags=["分享"] |
|
) |
|
async def file_batch_delete(ids: List[str]): |
|
return await THUNDERX_CLIENT.file_batch_delete(ids) |
|
|
|
|
|
@api_router.post( |
|
"/get_share_folder", |
|
summary="获取分享信息", |
|
description="获取分享信息", |
|
tags=["分享"], |
|
) |
|
async def get_share_folder( |
|
share_id: str, pass_code_token: str | None = None, parent_id: str | None = None |
|
): |
|
return await THUNDERX_CLIENT.get_share_folder(share_id, pass_code_token, parent_id) |
|
|
|
|
|
@api_router.post( |
|
"/restore", summary="转存分享文件", description="转存分享文件", tags=["分享"] |
|
) |
|
async def restore( |
|
share_id: str, pass_code_token: str | None = None, file_ids: List[str] | None = None |
|
): |
|
return await THUNDERX_CLIENT.restore(share_id, pass_code_token, file_ids) |
|
|
|
|
|
|
|
|
|
|
|
@api_router.get( |
|
"/offline", summary="离线任务列表", description="离线任务列表", tags=["离线任务"] |
|
) |
|
async def offline_list(size: int = 10000, next_page_token: str | None = None): |
|
return await THUNDERX_CLIENT.offline_list( |
|
size=size, |
|
next_page_token=next_page_token, |
|
phase=None, |
|
) |
|
|
|
|
|
@api_router.post( |
|
"/offline", summary="添加离线任务", description="添加离线任务", tags=["离线任务"] |
|
) |
|
async def offline(item: OfflineRequest): |
|
return await THUNDERX_CLIENT.offline_download( |
|
item.file_url, item.parent_id, item.name |
|
) |
|
|
|
|
|
@api_router.post( |
|
"/delete_tasks", |
|
summary="删除离线任务", |
|
description="删除离线任务", |
|
tags=["离线任务"], |
|
) |
|
async def delete_tasks(task_ids: List[str], delete_files: bool = False): |
|
return await THUNDERX_CLIENT.delete_tasks(task_ids, delete_files) |
|
|
|
|
|
|
|
@api_router.get( |
|
"/userinfo", summary="用户信息", description="获取用户登陆信息", tags=["账号"] |
|
) |
|
async def userinfo(): |
|
return THUNDERX_CLIENT.get_user_info() |
|
|
|
|
|
@api_router.get( |
|
"/quota", summary="空间使用信息", description="获取空间使用信息", tags=["账号"] |
|
) |
|
async def quota_info(): |
|
return await THUNDERX_CLIENT.get_quota_info() |
|
|
|
|
|
@api_router.get( |
|
"/invite_code", summary="查看邀请码", description="查看邀请码", tags=["账号"] |
|
) |
|
async def get_invite_code(): |
|
return await THUNDERX_CLIENT.get_invite_code() |
|
|
|
|
|
app.include_router(front_router) |
|
app.include_router(api_router) |
|
|