Spaces:
Running
Running
File size: 6,436 Bytes
21db53c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import os
from asyncio import to_thread
from pathlib import Path as syncPath
from shutil import copy2, move
from typing import Optional, AsyncGenerator
import aiofiles
from loguru import logger
from app.Services.storage.base import BaseStorage, FileMetaDataT, RemoteFilePathType, LocalFilePathType
from app.Services.storage.exception import RemoteFileNotFoundError, LocalFileNotFoundError, RemoteFilePermissionError, \
LocalFilePermissionError, LocalFileExistsError, RemoteFileExistsError
from app.config import config
from app.util.local_file_utility import glob_local_files
def transform_exception(param: str):
file_not_found_exp_map = {"local": LocalFileNotFoundError, "remote": RemoteFileNotFoundError}
permission_exp_map = {"remote": RemoteFilePermissionError, "local": LocalFilePermissionError}
file_exist_map = {"local": LocalFileExistsError, "remote": RemoteFileExistsError}
def decorator(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except FileNotFoundError as ex:
raise file_not_found_exp_map[param] from ex
except PermissionError as ex:
raise permission_exp_map[param] from ex
except FileExistsError as ex:
raise file_exist_map[param] from ex
return wrapper
return decorator
class LocalStorage(BaseStorage[FileMetaDataT: None]):
def __init__(self):
super().__init__()
self.static_dir = syncPath(os.path.abspath(config.storage.local.path))
self.thumbnails_dir = self.static_dir / "thumbnails"
self.deleted_dir = self.static_dir / "_deleted"
self.file_metadata = None
self.file_path_warp = lambda x: self.static_dir / syncPath(x)
def file_path_wrap(self, path: RemoteFilePathType) -> syncPath:
return self.static_dir / syncPath(path)
async def on_load(self):
if not self.static_dir.is_dir():
self.static_dir.mkdir(parents=True)
logger.warning(f"static_dir {self.static_dir} not found, created.")
if not self.thumbnails_dir.is_dir():
self.thumbnails_dir.mkdir(parents=True)
logger.warning(f"thumbnails_dir {self.thumbnails_dir} not found, created.")
if not self.deleted_dir.is_dir():
self.deleted_dir.mkdir(parents=True)
logger.warning(f"deleted_dir {self.deleted_dir} not found, created.")
async def is_exist(self,
remote_file: "RemoteFilePathType") -> bool:
return self.file_path_warp(remote_file).exists()
@transform_exception("remote")
async def size(self,
remote_file: "RemoteFilePathType") -> int:
_file = self.file_path_warp(remote_file)
return self.file_path_warp(remote_file).stat().st_size
# noinspection PyMethodMayBeStatic
async def url(self,
remote_file: "RemoteFilePathType") -> str:
return f"/static/{str(remote_file)}"
async def presign_url(self,
remote_file: "RemoteFilePathType",
expire_second: int = 3600) -> str:
return f"/static/{str(remote_file)}"
@transform_exception("remote")
async def fetch(self,
remote_file: "RemoteFilePathType") -> bytes:
remote_file = self.file_path_warp(remote_file)
async with aiofiles.open(str(remote_file), 'rb') as file:
return await file.read()
@transform_exception("local")
async def upload(self,
local_file: "LocalFilePathType",
remote_file: "RemoteFilePathType") -> None:
remote_file = self.file_path_warp(remote_file)
if isinstance(local_file, bytes):
async with aiofiles.open(str(remote_file), 'wb') as file:
await file.write(local_file)
else:
await to_thread(copy2, str(local_file), str(remote_file))
local_file = f"{len(local_file)} bytes" if isinstance(local_file, bytes) else local_file
logger.success(f"Successfully uploaded file {str(local_file)} to {str(remote_file)} via local_storage.")
@transform_exception("remote")
async def copy(self,
old_remote_file: "RemoteFilePathType",
new_remote_file: "RemoteFilePathType") -> None:
old_remote_file = self.file_path_warp(old_remote_file)
new_remote_file = self.file_path_warp(new_remote_file)
await to_thread(copy2, str(old_remote_file), str(new_remote_file))
logger.success(f"Successfully copied file {str(old_remote_file)} to {str(new_remote_file)} via local_storage.")
@transform_exception("remote")
async def move(self,
old_remote_file: "RemoteFilePathType",
new_remote_file: "RemoteFilePathType") -> None:
old_remote_file = self.file_path_warp(old_remote_file)
new_remote_file = self.file_path_warp(new_remote_file)
await to_thread(move, str(old_remote_file), str(new_remote_file), copy_function=copy2)
logger.success(f"Successfully moved file {str(old_remote_file)} to {str(new_remote_file)} via local_storage.")
@transform_exception("remote")
async def delete(self,
remote_file: "RemoteFilePathType") -> None:
remote_file = self.file_path_warp(remote_file)
await to_thread(os.remove, str(remote_file))
logger.success(f"Successfully deleted file {str(remote_file)} via local_storage.")
async def list_files(self,
path: RemoteFilePathType,
pattern: Optional[str] = "*",
batch_max_files: Optional[int] = None,
valid_extensions: Optional[set[str]] = None) \
-> AsyncGenerator[list[RemoteFilePathType], None]:
local_path = self.file_path_warp(path)
files = []
for file in glob_local_files(local_path, pattern, valid_extensions):
files.append(file)
if batch_max_files is not None and len(files) == batch_max_files:
yield files
files = []
if files:
yield files
async def update_metadata(self,
local_file_metadata: None,
remote_file_metadata: None) -> None:
raise NotImplementedError
|