|
import asyncio |
|
import binascii |
|
import inspect |
|
import json |
|
import logging |
|
import re |
|
from base64 import b64decode, b64encode |
|
from hashlib import md5 |
|
from types import NoneType |
|
from typing import Any, Dict, List, Optional, Callable, Coroutine |
|
from urllib.parse import urljoin |
|
|
|
import httpx |
|
|
|
from .PikpakException import PikpakException, PikpakRetryException |
|
from .enums import DownloadStatus |
|
from .utils import ( |
|
CLIENT_ID, |
|
CLIENT_SECRET, |
|
CLIENT_VERSION, |
|
PACKAG_ENAME, |
|
build_custom_user_agent, |
|
captcha_sign, |
|
get_timestamp, |
|
) |
|
|
|
|
|
class PikPakApi: |
|
""" |
|
PikPakApi class |
|
|
|
Attributes: |
|
PIKPAK_API_HOST: str - PikPak API host |
|
PIKPAK_USER_HOST: str - PikPak user API host |
|
|
|
username: str - username of the user |
|
password: str - password of the user |
|
encoded_token: str - encoded token of the user with access and refresh tokens |
|
access_token: str - access token of the user , expire in 7200 |
|
refresh_token: str - refresh token of the user |
|
user_id: str - user id of the user |
|
token_refresh_callback: Callable[[PikPakApi, **Any], Coroutine[Any, Any, None]] - async callback function to be called after token refresh |
|
token_refresh_callback_kwargs: Dict[str, Any] - custom arguments to be passed to the token refresh callback |
|
""" |
|
|
|
PIKPAK_API_HOST = "api-pan.xunleix.com" |
|
PIKPAK_USER_HOST = "xluser-ssl.xunleix.com" |
|
|
|
def __init__( |
|
self, |
|
username: Optional[str] = None, |
|
password: Optional[str] = None, |
|
encoded_token: Optional[str] = None, |
|
httpx_client_args: Optional[Dict[str, Any]] = None, |
|
device_id: Optional[str] = None, |
|
request_max_retries: int = 3, |
|
request_initial_backoff: float = 3.0, |
|
token_refresh_callback: Optional[Callable] = None, |
|
token_refresh_callback_kwargs: Optional[Dict[str, Any]] = None, |
|
): |
|
""" |
|
username: str - username of the user |
|
password: str - password of the user |
|
encoded_token: str - encoded token of the user with access and refresh token |
|
httpx_client_args: dict - extra arguments for httpx.AsyncClient (https://www.python-httpx.org/api/#asyncclient) |
|
device_id: str - device id to identify the device |
|
request_max_retries: int - maximum number of retries for requests |
|
request_initial_backoff: float - initial backoff time for retries |
|
token_refresh_callback: Callable[[PikPakApi, **Any], Coroutine[Any, Any, None]] - async callback function to be called after token refresh |
|
token_refresh_callback_kwargs: Dict[str, Any] - custom arguments to be passed to the token refresh callback |
|
""" |
|
|
|
self.username = username |
|
self.password = password |
|
self.encoded_token = encoded_token |
|
self.max_retries = request_max_retries |
|
self.initial_backoff = request_initial_backoff |
|
self.token_refresh_callback = token_refresh_callback |
|
self.token_refresh_callback_kwargs = token_refresh_callback_kwargs or {} |
|
|
|
self.access_token = None |
|
self.refresh_token = None |
|
self.user_id = None |
|
|
|
self.data_response = None |
|
|
|
|
|
self.device_id = ( |
|
device_id |
|
if device_id |
|
else md5(f"{self.username}{self.password}".encode()).hexdigest() |
|
) |
|
self.captcha_token = None |
|
|
|
httpx_client_args = httpx_client_args or {"timeout": 10} |
|
self.httpx_client = httpx.AsyncClient(**httpx_client_args) |
|
|
|
self._path_id_cache: Dict[str, Any] = {} |
|
|
|
self.user_agent: Optional[str] = None |
|
|
|
if self.encoded_token: |
|
self.decode_token() |
|
elif self.username and self.password: |
|
pass |
|
else: |
|
raise PikpakException("username and password or encoded_token is required") |
|
|
|
@classmethod |
|
def from_dict(cls, data: Dict[str, Any]) -> "PikPakApi": |
|
""" |
|
Create PikPakApi object from a dictionary |
|
""" |
|
params = inspect.signature(cls).parameters |
|
filtered_data = {key: data[key] for key in params if key in data} |
|
client = cls( |
|
**filtered_data, |
|
) |
|
client.__dict__.update(data) |
|
return client |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
""" |
|
Returns the PikPakApi object as a dictionary |
|
""" |
|
data = self.__dict__.copy() |
|
|
|
keys_to_delete = [ |
|
k |
|
for k, v in data.items() |
|
if not type(v) in [str, int, float, bool, list, dict, NoneType] |
|
] |
|
for k in keys_to_delete: |
|
del data[k] |
|
return data |
|
|
|
def build_custom_user_agent(self) -> str: |
|
|
|
self.user_agent = build_custom_user_agent( |
|
device_id=self.device_id, |
|
user_id=self.user_id if self.user_id else "", |
|
) |
|
return self.user_agent |
|
|
|
def get_headers(self, access_token: Optional[str] = None) -> Dict[str, str]: |
|
""" |
|
Returns the headers to use for the requests. |
|
""" |
|
headers = { |
|
"User-Agent": ( |
|
self.build_custom_user_agent() |
|
if self.captcha_token |
|
else "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" |
|
), |
|
"Content-Type": "application/json; charset=utf-8", |
|
} |
|
|
|
if self.access_token: |
|
headers["Authorization"] = f"Bearer {self.access_token}" |
|
if access_token: |
|
headers["Authorization"] = f"Bearer {access_token}" |
|
if self.captcha_token: |
|
headers["X-Captcha-Token"] = self.captcha_token |
|
if self.device_id: |
|
headers["X-Device-Id"] = self.device_id |
|
return headers |
|
|
|
async def _make_request( |
|
self, |
|
method: str, |
|
url: str, |
|
data: Optional[Dict[str, Any]] = None, |
|
params: Optional[Dict[str, Any]] = None, |
|
headers: Optional[Dict[str, str]] = None, |
|
) -> Dict[str, Any]: |
|
last_error = None |
|
|
|
|
|
|
|
|
|
for attempt in range(self.max_retries): |
|
try: |
|
response = await self._send_request(method, url, data, params, headers) |
|
return await self._handle_response(response) |
|
except PikpakRetryException as error: |
|
logging.info(f"Retry attempt {attempt + 1}/{self.max_retries}") |
|
last_error = error |
|
except PikpakException: |
|
raise |
|
except httpx.HTTPError as error: |
|
logging.error( |
|
f"HTTP Error on attempt {attempt + 1}/{self.max_retries}: {str(error)}" |
|
) |
|
last_error = error |
|
except Exception as error: |
|
logging.error( |
|
f"Unexpected error on attempt {attempt + 1}/{self.max_retries}: {str(error)}" |
|
) |
|
last_error = error |
|
|
|
await asyncio.sleep(self.initial_backoff * (2**attempt)) |
|
|
|
|
|
raise PikpakException(f"Max retries reached. Last error: {str(last_error)}") |
|
|
|
async def _send_request(self, method, url, data, params, headers): |
|
req_headers = headers or self.get_headers() |
|
return await self.httpx_client.request( |
|
method, |
|
url, |
|
json=data, |
|
params=params, |
|
headers=req_headers, |
|
) |
|
|
|
async def _handle_response(self, response) -> Dict[str, Any]: |
|
try: |
|
json_data = response.json() |
|
except ValueError: |
|
if response.status_code == 200: |
|
return {} |
|
raise PikpakRetryException("Empty JSON data") |
|
|
|
self.data_response = json_data |
|
|
|
if not json_data: |
|
if response.status_code == 200: |
|
return {} |
|
raise PikpakRetryException("Empty JSON data") |
|
|
|
if "error" not in json_data: |
|
return json_data |
|
|
|
if "captcha_token" in json_data: |
|
self.captcha_token = json_data["captcha_token"] |
|
|
|
if json_data["error"] == "invalid_account_or_password": |
|
raise PikpakException("Invalid username or password") |
|
|
|
if json_data.get("error_code") == 16: |
|
await self.refresh_access_token() |
|
raise PikpakRetryException("Token refreshed, please retry") |
|
|
|
raise PikpakException(json_data.get("error_description", "Unknown Error")) |
|
|
|
async def _request_get( |
|
self, |
|
url: str, |
|
params: dict = None, |
|
): |
|
return await self._make_request("get", url, params=params) |
|
|
|
async def _request_post( |
|
self, |
|
url: str, |
|
data: dict = None, |
|
headers: dict = None, |
|
): |
|
return await self._make_request("post", url, data=data, headers=headers) |
|
|
|
async def _request_patch( |
|
self, |
|
url: str, |
|
data: dict = None, |
|
): |
|
return await self._make_request("patch", url, data=data) |
|
|
|
async def _request_delete( |
|
self, |
|
url: str, |
|
params: dict = None, |
|
data: dict = None, |
|
): |
|
return await self._make_request("delete", url, params=params, data=data) |
|
|
|
def decode_token(self): |
|
"""Decodes the encoded token to update access and refresh tokens.""" |
|
try: |
|
decoded_data = json.loads(b64decode(self.encoded_token).decode()) |
|
except (binascii.Error, json.JSONDecodeError): |
|
raise PikpakException("Invalid encoded token") |
|
if not decoded_data.get("access_token") or not decoded_data.get( |
|
"refresh_token" |
|
): |
|
raise PikpakException("Invalid encoded token") |
|
self.access_token = decoded_data.get("access_token") |
|
self.refresh_token = decoded_data.get("refresh_token") |
|
|
|
def encode_token(self): |
|
"""Encodes the access and refresh tokens into a single string.""" |
|
token_data = { |
|
"access_token": self.access_token, |
|
"refresh_token": self.refresh_token, |
|
} |
|
self.encoded_token = b64encode(json.dumps(token_data).encode()).decode() |
|
|
|
async def captcha_init(self, action: str, meta: dict = None) -> Dict[str, Any]: |
|
url = f"https://{PikPakApi.PIKPAK_USER_HOST}/v1/shield/captcha/init" |
|
if not meta: |
|
t = f"{get_timestamp()}" |
|
meta = { |
|
"captcha_sign": captcha_sign(self.device_id, t), |
|
"client_version": CLIENT_VERSION, |
|
"package_name": PACKAG_ENAME, |
|
"user_id": self.user_id, |
|
"timestamp": t, |
|
} |
|
params = { |
|
"client_id": CLIENT_ID, |
|
"action": action, |
|
"device_id": self.device_id, |
|
"meta": meta, |
|
} |
|
return await self._request_post(url, data=params) |
|
|
|
async def login(self) -> None: |
|
""" |
|
Login to PikPak |
|
""" |
|
login_url = f"https://{PikPakApi.PIKPAK_USER_HOST}/v1/auth/signin" |
|
metas = {} |
|
if not self.username or not self.password: |
|
raise PikpakException("username and password are required") |
|
if re.match(r"\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*", self.username): |
|
metas["email"] = self.username |
|
elif re.match(r"\d{11,18}", self.username): |
|
metas["phone_number"] = self.username |
|
else: |
|
metas["username"] = self.username |
|
result = await self.captcha_init( |
|
action=f"POST:{login_url}", |
|
meta=metas, |
|
) |
|
captcha_token = result.get("captcha_token", "") |
|
if not captcha_token: |
|
raise PikpakException("captcha_token get failed") |
|
|
|
login_data = { |
|
"client_id": CLIENT_ID, |
|
"client_secret": CLIENT_SECRET, |
|
"password": self.password, |
|
"username": self.username, |
|
"captcha_token": captcha_token, |
|
} |
|
user_info = await self._request_post( |
|
login_url, |
|
login_data, |
|
{ |
|
"Content-Type": "application/x-www-form-urlencoded", |
|
}, |
|
) |
|
self.access_token = user_info["access_token"] |
|
self.refresh_token = user_info["refresh_token"] |
|
self.user_id = user_info["sub"] |
|
self.encode_token() |
|
|
|
async def refresh_access_token(self) -> None: |
|
""" |
|
Refresh access token |
|
""" |
|
refresh_url = f"https://{self.PIKPAK_USER_HOST}/v1/auth/token" |
|
refresh_data = { |
|
"client_id": CLIENT_ID, |
|
"refresh_token": self.refresh_token, |
|
"grant_type": "refresh_token", |
|
} |
|
user_info = await self._request_post(refresh_url, refresh_data) |
|
|
|
self.access_token = user_info["access_token"] |
|
self.refresh_token = user_info["refresh_token"] |
|
self.user_id = user_info["sub"] |
|
self.encode_token() |
|
if self.token_refresh_callback: |
|
await self.token_refresh_callback( |
|
self, **self.token_refresh_callback_kwargs |
|
) |
|
|
|
def get_user_info(self) -> Dict[str, Optional[str]]: |
|
""" |
|
Get user info |
|
""" |
|
return { |
|
"username": self.username, |
|
"user_id": self.user_id, |
|
"access_token": self.access_token, |
|
"refresh_token": self.refresh_token, |
|
"encoded_token": self.encoded_token, |
|
} |
|
|
|
async def create_folder( |
|
self, name: str = "新建文件夹", parent_id: Optional[str] = None |
|
) -> Dict[str, Any]: |
|
""" |
|
name: str - 文件夹名称 |
|
parent_id: str - 父文件夹id, 默认创建到根目录 |
|
|
|
创建文件夹 |
|
""" |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files" |
|
data = { |
|
"kind": "drive#folder", |
|
"name": name, |
|
"parent_id": parent_id, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post(url, data) |
|
return result |
|
|
|
async def delete_to_trash(self, ids: List[str]) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件夹、文件id列表 |
|
|
|
将文件夹、文件移动到回收站 |
|
""" |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchTrash" |
|
data = { |
|
"ids": ids, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchTrash") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post(url, data) |
|
return result |
|
|
|
async def untrash(self, ids: List[str]) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件夹、文件id列表 |
|
|
|
将文件夹、文件移出回收站 |
|
""" |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchUntrash" |
|
data = { |
|
"ids": ids, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchUntrash") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post(url, data) |
|
return result |
|
|
|
async def emptytrash(self) -> Dict[str, Any]: |
|
""" |
|
清空回收站 |
|
""" |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files/trash:empty" |
|
data = {} |
|
captcha_result = await self.captcha_init(f"PATCH:/drive/v1/files/trash:empty") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_patch(url, data) |
|
return result |
|
|
|
async def delete_forever(self, ids: List[str]) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件夹、文件id列表 |
|
|
|
永远删除文件夹、文件, 慎用 |
|
""" |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchDelete" |
|
data = { |
|
"ids": ids, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchDelete") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post(url, data) |
|
return result |
|
|
|
async def offline_download( |
|
self, file_url: str, parent_id: Optional[str] = None, name: Optional[str] = None |
|
) -> Dict[str, Any]: |
|
""" |
|
file_url: str - 文件链接 |
|
parent_id: str - 父文件夹id, 不传默认存储到 My Pack |
|
name: str - 文件名, 不传默认为文件链接的文件名 |
|
|
|
离线下载磁力链 |
|
""" |
|
download_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files" |
|
download_data = { |
|
"kind": "drive#file", |
|
"name": name, |
|
"upload_type": "UPLOAD_TYPE_URL", |
|
"url": {"url": file_url, "parent_id": parent_id}, |
|
"parent_id": parent_id, |
|
} |
|
|
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post(download_url, download_data) |
|
return result |
|
|
|
async def offline_list( |
|
self, |
|
size: int = 10000, |
|
next_page_token: Optional[str] = None, |
|
phase: Optional[List[str]] = None, |
|
) -> Dict[str, Any]: |
|
""" |
|
size: int - 每次请求的数量 |
|
next_page_token: str - 下一页的page token |
|
phase: List[str] - Offline download task status, default is ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"] |
|
supported values: PHASE_TYPE_RUNNING, PHASE_TYPE_ERROR, PHASE_TYPE_COMPLETE, PHASE_TYPE_PENDING |
|
|
|
获取离线下载列表 |
|
""" |
|
if phase is None: |
|
phase = ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"] |
|
list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks" |
|
list_data = { |
|
"type": "offline", |
|
"thumbnail_size": "SIZE_SMALL", |
|
"limit": size, |
|
"page_token": next_page_token, |
|
"filters": json.dumps({"phase": {"in": ",".join(phase)}}), |
|
"with": "reference_resource", |
|
} |
|
captcha_result = await self.captcha_init("GET:/drive/v1/tasks") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_get(list_url, list_data) |
|
return result |
|
|
|
async def offline_file_info(self, file_id: str) -> Dict[str, Any]: |
|
""" |
|
file_id: str - 离线下载文件id |
|
|
|
离线下载文件信息 |
|
""" |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files/{file_id}") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{file_id}" |
|
result = await self._request_get(url, {"thumbnail_size": "SIZE_LARGE"}) |
|
return result |
|
|
|
async def file_list( |
|
self, |
|
size: int = 100, |
|
parent_id: Optional[str] = None, |
|
next_page_token: Optional[str] = None, |
|
additional_filters: Optional[Dict[str, Any]] = None, |
|
) -> Dict[str, Any]: |
|
""" |
|
size: int - 每次请求的数量 |
|
parent_id: str - 父文件夹id, 默认列出根目录 |
|
next_page_token: str - 下一页的page token |
|
additional_filters: Dict[str, Any] - 额外的过滤条件 |
|
|
|
获取文件列表,可以获得文件下载链接 |
|
""" |
|
default_filters = { |
|
"trashed": {"eq": False}, |
|
"phase": {"eq": "PHASE_TYPE_COMPLETE"}, |
|
} |
|
if additional_filters: |
|
default_filters.update(additional_filters) |
|
list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files" |
|
list_data = { |
|
"parent_id": parent_id, |
|
"thumbnail_size": "SIZE_MEDIUM", |
|
"limit": size, |
|
"with_audit": "true", |
|
"page_token": next_page_token, |
|
"filters": json.dumps(default_filters), |
|
} |
|
|
|
captcha_result = await self.captcha_init("GET:/drive/v1/files") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_get(list_url, list_data) |
|
return result |
|
|
|
async def events( |
|
self, size: int = 100, next_page_token: Optional[str] = None |
|
) -> Dict[str, Any]: |
|
""" |
|
size: int - 每次请求的数量 |
|
next_page_token: str - 下一页的page token |
|
|
|
获取最近添加事件列表 |
|
""" |
|
list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/events" |
|
list_data = { |
|
"thumbnail_size": "SIZE_MEDIUM", |
|
"limit": size, |
|
"next_page_token": next_page_token, |
|
} |
|
captcha_result = await self.captcha_init("GET:/drive/v1/files") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_get(list_url, list_data) |
|
return result |
|
|
|
async def offline_task_retry(self, task_id: str) -> Dict[str, Any]: |
|
""" |
|
task_id: str - 离线下载任务id |
|
|
|
重试离线下载任务 |
|
""" |
|
list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/task" |
|
list_data = { |
|
"type": "offline", |
|
"create_type": "RETRY", |
|
"id": task_id, |
|
} |
|
try: |
|
captcha_result = await self.captcha_init("GET:/drive/v1/task") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post(list_url, list_data) |
|
return result |
|
except Exception as e: |
|
raise PikpakException(f"重试离线下载任务失败: {task_id}. {e}") |
|
|
|
async def delete_tasks( |
|
self, task_ids: List[str], delete_files: bool = False |
|
) -> None: |
|
""" |
|
delete tasks by task ids |
|
task_ids: List[str] - task ids to delete |
|
""" |
|
delete_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks" |
|
params = { |
|
"task_ids": task_ids, |
|
"delete_files": delete_files, |
|
} |
|
try: |
|
captcha_result = await self.captcha_init("GET:/drive/v1/tasks") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
await self._request_delete(delete_url, params=params) |
|
except Exception as e: |
|
raise PikpakException(f"Failing to delete tasks: {task_ids}. {e}") |
|
|
|
async def get_task_status(self, task_id: str, file_id: str) -> DownloadStatus: |
|
""" |
|
task_id: str - 离线下载任务id |
|
file_id: str - 离线下载文件id |
|
|
|
获取离线下载任务状态, 临时实现, 后期可能变更 |
|
""" |
|
try: |
|
infos = await self.offline_list() |
|
if infos and infos.get("tasks", []): |
|
for task in infos.get("tasks", []): |
|
if task_id == task.get("id"): |
|
return DownloadStatus.downloading |
|
file_info = await self.offline_file_info(file_id=file_id) |
|
if file_info: |
|
return DownloadStatus.done |
|
else: |
|
return DownloadStatus.not_found |
|
except PikpakException: |
|
return DownloadStatus.error |
|
|
|
async def path_to_id(self, path: str, create: bool = False) -> List[Dict[str, str]]: |
|
""" |
|
path: str - 路径 |
|
create: bool - 是否创建不存在的文件夹 |
|
|
|
将形如 /path/a/b 的路径转换为 文件夹的id |
|
""" |
|
if not path or len(path) <= 0: |
|
return [] |
|
paths = path.split("/") |
|
paths = [p.strip() for p in paths if len(p) > 0] |
|
|
|
multi_level_paths = ["/" + "/".join(paths[: i + 1]) for i in range(len(paths))] |
|
path_ids = [ |
|
self._path_id_cache[p] |
|
for p in multi_level_paths |
|
if p in self._path_id_cache |
|
] |
|
|
|
hit_cnt = len(path_ids) |
|
if hit_cnt == len(paths): |
|
return path_ids |
|
elif hit_cnt == 0: |
|
count = 0 |
|
parent_id = None |
|
else: |
|
count = hit_cnt |
|
parent_id = path_ids[-1]["id"] |
|
|
|
next_page_token = None |
|
while count < len(paths): |
|
data = await self.file_list( |
|
parent_id=parent_id, next_page_token=next_page_token |
|
) |
|
record_of_target_path = None |
|
for f in data.get("files", []): |
|
current_path = "/" + "/".join(paths[:count] + [f.get("name")]) |
|
file_type = ( |
|
"folder" if f.get("kind", "").find("folder") != -1 else "file" |
|
) |
|
record = { |
|
"id": f.get("id"), |
|
"name": f.get("name"), |
|
"file_type": file_type, |
|
} |
|
self._path_id_cache[current_path] = record |
|
if f.get("name") == paths[count]: |
|
record_of_target_path = record |
|
|
|
if record_of_target_path is not None: |
|
path_ids.append(record_of_target_path) |
|
count += 1 |
|
parent_id = record_of_target_path["id"] |
|
elif data.get("next_page_token") and ( |
|
not next_page_token or next_page_token != data.get("next_page_token") |
|
): |
|
next_page_token = data.get("next_page_token") |
|
elif create: |
|
data = await self.create_folder(name=paths[count], parent_id=parent_id) |
|
file_id = data.get("file").get("id") |
|
record = { |
|
"id": file_id, |
|
"name": paths[count], |
|
"file_type": "folder", |
|
} |
|
path_ids.append(record) |
|
current_path = "/" + "/".join(paths[: count + 1]) |
|
self._path_id_cache[current_path] = record |
|
count += 1 |
|
parent_id = file_id |
|
else: |
|
break |
|
return path_ids |
|
|
|
async def file_batch_move( |
|
self, |
|
ids: List[str], |
|
to_parent_id: Optional[str] = None, |
|
) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件id列表 |
|
to_parent_id: str - 移动到的文件夹id, 默认为根目录 |
|
|
|
批量移动文件 |
|
""" |
|
to = ( |
|
{ |
|
"parent_id": to_parent_id, |
|
} |
|
if to_parent_id |
|
else {} |
|
) |
|
captcha_result = await self.captcha_init("GET:/drive/v1/files:batchMove") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchMove", |
|
data={ |
|
"ids": ids, |
|
"to": to, |
|
}, |
|
) |
|
return result |
|
|
|
async def file_batch_copy( |
|
self, |
|
ids: List[str], |
|
to_parent_id: Optional[str] = None, |
|
) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件id列表 |
|
to_parent_id: str - 复制到的文件夹id, 默认为根目录 |
|
|
|
批量复制文件 |
|
""" |
|
to = ( |
|
{ |
|
"parent_id": to_parent_id, |
|
} |
|
if to_parent_id |
|
else {} |
|
) |
|
captcha_result = await self.captcha_init("GET:/drive/v1/files:batchCopy") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchCopy", |
|
data={ |
|
"ids": ids, |
|
"to": to, |
|
}, |
|
) |
|
return result |
|
|
|
async def file_move_or_copy_by_path( |
|
self, |
|
from_path: List[str], |
|
to_path: str, |
|
move: bool = False, |
|
create: bool = False, |
|
) -> Dict[str, Any]: |
|
""" |
|
from_path: List[str] - 要移动或复制的文件路径列表 |
|
to_path: str - 移动或复制到的路径 |
|
is_move: bool - 是否移动, 默认为复制 |
|
create: bool - 是否创建不存在的文件夹 |
|
|
|
根据路径移动或复制文件 |
|
""" |
|
from_ids: List[str] = [] |
|
for path in from_path: |
|
if path_ids := await self.path_to_id(path): |
|
if file_id := path_ids[-1].get("id"): |
|
from_ids.append(file_id) |
|
if not from_ids: |
|
raise PikpakException("要移动的文件不存在") |
|
to_path_ids = await self.path_to_id(to_path, create=create) |
|
if to_path_ids: |
|
to_parent_id = to_path_ids[-1].get("id") |
|
else: |
|
to_parent_id = None |
|
if move: |
|
result = await self.file_batch_move(ids=from_ids, to_parent_id=to_parent_id) |
|
else: |
|
result = await self.file_batch_copy(ids=from_ids, to_parent_id=to_parent_id) |
|
return result |
|
|
|
async def get_download_url(self, file_id: str) -> Dict[str, Any]: |
|
""" |
|
id: str - 文件id |
|
|
|
Returns the file details data. |
|
1. Use `medias[0][link][url]` for streaming with high speed in streaming services or tools. |
|
2. Use `web_content_link` to download the file |
|
""" |
|
result = await self.captcha_init( |
|
action=f"GET:/drive/v1/files/{file_id}", |
|
) |
|
self.captcha_token = result.get("captcha_token") |
|
result = await self._request_get( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{file_id}?", |
|
) |
|
self.captcha_token = None |
|
return result |
|
|
|
async def file_rename(self, id: str, new_file_name: str) -> Dict[str, Any]: |
|
""" |
|
id: str - 文件id |
|
new_file_name: str - 新的文件名 |
|
|
|
重命名文件 |
|
返回文件的详细信息 |
|
""" |
|
data = { |
|
"name": new_file_name, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files/{id}") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_patch( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{id}", |
|
data=data, |
|
) |
|
return result |
|
|
|
async def file_batch_star( |
|
self, |
|
ids: List[str], |
|
) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件id列表 |
|
|
|
批量给文件加星标 |
|
""" |
|
data = { |
|
"ids": ids, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files/star") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:star", |
|
data=data, |
|
) |
|
return result |
|
|
|
async def file_batch_unstar( |
|
self, |
|
ids: List[str], |
|
) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件id列表 |
|
|
|
批量给文件取消星标 |
|
""" |
|
data = { |
|
"ids": ids, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/files/unstar") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:unstar", |
|
data=data, |
|
) |
|
return result |
|
|
|
async def file_star_list( |
|
self, |
|
size: int = 100, |
|
next_page_token: Optional[str] = None, |
|
) -> Dict[str, Any]: |
|
""" |
|
size: int - 每次请求的数量 |
|
next_page_token: str - 下一页的page token |
|
|
|
获取加星标的文件列表,可以获得文件下载链接 |
|
parent_id只可取默认值*,子目录列表通过获取星标目录以后自行使用file_list方法获取 |
|
""" |
|
additional_filters = {"system_tag": {"in": "STAR"}} |
|
result = await self.file_list( |
|
size=size, |
|
parent_id="*", |
|
next_page_token=next_page_token, |
|
additional_filters=additional_filters, |
|
) |
|
return result |
|
|
|
async def file_batch_share( |
|
self, |
|
ids: List[str], |
|
need_password: Optional[bool] = False, |
|
expiration_days: Optional[int] = -1, |
|
) -> Dict[str, Any]: |
|
""" |
|
ids: List[str] - 文件id列表 |
|
need_password: Optional[bool] - 是否需要分享密码 |
|
expiration_days: Optional[int] - 分享天数 |
|
|
|
批量分享文件,并生成分享链接 |
|
返回数据结构: |
|
{ |
|
"share_id": "xxx", //分享ID |
|
"share_url": "https://mypikpak.com/s/xxx", // 分享链接 |
|
"pass_code": "53fe", // 分享密码 |
|
"share_text": "https://mypikpak.com/s/xxx", |
|
"share_list": [] |
|
} |
|
""" |
|
data = { |
|
"file_ids": ids, |
|
"share_to": "encryptedlink" if need_password else "publiclink", |
|
"expiration_days": expiration_days, |
|
"pass_code_option": "REQUIRED" if need_password else "NOT_REQUIRED", |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/share") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/share", |
|
data=data, |
|
) |
|
return result |
|
|
|
async def get_quota_info(self) -> Dict[str, Any]: |
|
""" |
|
获取当前空间的quota信息 |
|
返回数据结构如下: |
|
{ |
|
"kind": "drive#about", |
|
"quota": { |
|
"kind": "drive#quota", |
|
"limit": "10995116277760", //空间总大小, 单位Byte |
|
"usage": "5113157556024", // 已用空间大小,单位Byte |
|
"usage_in_trash": "1281564700871", // 回收站占用大小,单位Byte |
|
"play_times_limit": "-1", |
|
"play_times_usage": "0" |
|
}, |
|
"expires_at": "", |
|
"quotas": {} |
|
} |
|
""" |
|
|
|
|
|
result = await self._request_get( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/about", |
|
) |
|
return result |
|
|
|
async def get_invite_code(self): |
|
captcha_result = await self.captcha_init(f"GET:/vip/v1/activity/inviteCode") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_get( |
|
url=f"https://{self.PIKPAK_API_HOST}/vip/v1/activity/inviteCode", |
|
) |
|
return result["code"] |
|
|
|
async def vip_info(self): |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/privilege/vip") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_get( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/privilege/vip", |
|
) |
|
return result |
|
|
|
async def get_transfer_quota(self) -> Dict[str, Any]: |
|
""" |
|
Get transfer quota |
|
""" |
|
url = f"https://{self.PIKPAK_API_HOST}/vip/v1/quantity/list?type=transfer" |
|
captcha_result = await self.captcha_init( |
|
f"GET:/vip/v1/quantity/list?type=transfer" |
|
) |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_get(url) |
|
return result |
|
|
|
async def get_share_folder( |
|
self, share_id: str, pass_code_token: str, parent_id: str = None |
|
) -> Dict[str, Any]: |
|
""" |
|
获取分享链接下文件夹内容 |
|
|
|
Args: |
|
share_id: str - 分享ID eg. /s/VO8BcRb-XXXXX 的 VO8BcRb-XXXXX |
|
pass_code_token: str - 通过 get_share_info 获取到的 pass_code_token |
|
parent_id: str - 父文件夹id, 默认列出根目录 |
|
""" |
|
data = { |
|
"limit": "100", |
|
"thumbnail_size": "SIZE_LARGE", |
|
"order": "6", |
|
"share_id": share_id, |
|
"parent_id": parent_id, |
|
"pass_code_token": pass_code_token, |
|
} |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/share/detail" |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/share/detail") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
return await self._request_get(url, params=data) |
|
|
|
async def get_share_info( |
|
self, share_link: str, pass_code: str = None |
|
) -> ValueError | Dict[str, Any] | List[Dict[str | Any, str | Any]]: |
|
""" |
|
获取分享链接下内容 |
|
|
|
Args: |
|
share_link: str - 分享链接 |
|
pass_code: str - 分享密码, 无密码则留空 |
|
""" |
|
match = re.search(r"/s/([^/]+)(?:.*/([^/]+))?$", share_link) |
|
if match: |
|
share_id = match.group(1) |
|
parent_id = match.group(2) if match.group(2) else None |
|
else: |
|
return ValueError("Share Link Is Not Right") |
|
|
|
data = { |
|
"limit": "100", |
|
"thumbnail_size": "SIZE_LARGE", |
|
"order": "3", |
|
"share_id": share_id, |
|
"parent_id": parent_id, |
|
"pass_code": pass_code, |
|
} |
|
url = f"https://{self.PIKPAK_API_HOST}/drive/v1/share" |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/share") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
return await self._request_get(url, params=data) |
|
|
|
async def restore( |
|
self, share_id: str, pass_code_token: str, file_ids: List[str] |
|
) -> Dict[str, Any]: |
|
""" |
|
|
|
Args: |
|
share_id: 分享链接eg. /s/VO8BcRb-XXXXX 的 VO8BcRb-XXXXX |
|
pass_code_token: get_share_info获取, 无密码则留空 |
|
file_ids: 需要转存的文件/文件夹ID列表, get_share_info获取id值 |
|
""" |
|
data = { |
|
"share_id": share_id, |
|
"pass_code_token": pass_code_token, |
|
"file_ids": file_ids, |
|
} |
|
captcha_result = await self.captcha_init(f"GET:/drive/v1/share/restore") |
|
self.captcha_token = captcha_result.get("captcha_token") |
|
result = await self._request_post( |
|
url=f"https://{self.PIKPAK_API_HOST}/drive/v1/share/restore", data=data |
|
) |
|
return result |
|
|