File size: 6,436 Bytes
21db53c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
from asyncio import to_thread
from pathlib import Path as syncPath
from shutil import copy2, move
from typing import Optional, AsyncGenerator

import aiofiles
from loguru import logger

from app.Services.storage.base import BaseStorage, FileMetaDataT, RemoteFilePathType, LocalFilePathType
from app.Services.storage.exception import RemoteFileNotFoundError, LocalFileNotFoundError, RemoteFilePermissionError, \
    LocalFilePermissionError, LocalFileExistsError, RemoteFileExistsError
from app.config import config
from app.util.local_file_utility import glob_local_files


def transform_exception(param: str):
    file_not_found_exp_map = {"local": LocalFileNotFoundError, "remote": RemoteFileNotFoundError}
    permission_exp_map = {"remote": RemoteFilePermissionError, "local": LocalFilePermissionError}
    file_exist_map = {"local": LocalFileExistsError, "remote": RemoteFileExistsError}

    def decorator(func):
        async def wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except FileNotFoundError as ex:
                raise file_not_found_exp_map[param] from ex
            except PermissionError as ex:
                raise permission_exp_map[param] from ex
            except FileExistsError as ex:
                raise file_exist_map[param] from ex

        return wrapper

    return decorator


class LocalStorage(BaseStorage[FileMetaDataT: None]):
    def __init__(self):
        super().__init__()
        self.static_dir = syncPath(os.path.abspath(config.storage.local.path))
        self.thumbnails_dir = self.static_dir / "thumbnails"
        self.deleted_dir = self.static_dir / "_deleted"
        self.file_metadata = None
        self.file_path_warp = lambda x: self.static_dir / syncPath(x)

    def file_path_wrap(self, path: RemoteFilePathType) -> syncPath:
        return self.static_dir / syncPath(path)

    async def on_load(self):
        if not self.static_dir.is_dir():
            self.static_dir.mkdir(parents=True)
            logger.warning(f"static_dir {self.static_dir} not found, created.")
        if not self.thumbnails_dir.is_dir():
            self.thumbnails_dir.mkdir(parents=True)
            logger.warning(f"thumbnails_dir {self.thumbnails_dir} not found, created.")
        if not self.deleted_dir.is_dir():
            self.deleted_dir.mkdir(parents=True)
            logger.warning(f"deleted_dir {self.deleted_dir} not found, created.")

    async def is_exist(self,
                       remote_file: "RemoteFilePathType") -> bool:
        return self.file_path_warp(remote_file).exists()

    @transform_exception("remote")
    async def size(self,
                   remote_file: "RemoteFilePathType") -> int:
        _file = self.file_path_warp(remote_file)
        return self.file_path_warp(remote_file).stat().st_size

    # noinspection PyMethodMayBeStatic
    async def url(self,
                  remote_file: "RemoteFilePathType") -> str:
        return f"/static/{str(remote_file)}"

    async def presign_url(self,
                          remote_file: "RemoteFilePathType",
                          expire_second: int = 3600) -> str:
        return f"/static/{str(remote_file)}"

    @transform_exception("remote")
    async def fetch(self,
                    remote_file: "RemoteFilePathType") -> bytes:
        remote_file = self.file_path_warp(remote_file)
        async with aiofiles.open(str(remote_file), 'rb') as file:
            return await file.read()

    @transform_exception("local")
    async def upload(self,
                     local_file: "LocalFilePathType",
                     remote_file: "RemoteFilePathType") -> None:
        remote_file = self.file_path_warp(remote_file)
        if isinstance(local_file, bytes):
            async with aiofiles.open(str(remote_file), 'wb') as file:
                await file.write(local_file)
        else:
            await to_thread(copy2, str(local_file), str(remote_file))
        local_file = f"{len(local_file)} bytes" if isinstance(local_file, bytes) else local_file
        logger.success(f"Successfully uploaded file {str(local_file)} to {str(remote_file)} via local_storage.")

    @transform_exception("remote")
    async def copy(self,
                   old_remote_file: "RemoteFilePathType",
                   new_remote_file: "RemoteFilePathType") -> None:
        old_remote_file = self.file_path_warp(old_remote_file)
        new_remote_file = self.file_path_warp(new_remote_file)
        await to_thread(copy2, str(old_remote_file), str(new_remote_file))
        logger.success(f"Successfully copied file {str(old_remote_file)} to {str(new_remote_file)} via local_storage.")

    @transform_exception("remote")
    async def move(self,
                   old_remote_file: "RemoteFilePathType",
                   new_remote_file: "RemoteFilePathType") -> None:
        old_remote_file = self.file_path_warp(old_remote_file)
        new_remote_file = self.file_path_warp(new_remote_file)
        await to_thread(move, str(old_remote_file), str(new_remote_file), copy_function=copy2)
        logger.success(f"Successfully moved file {str(old_remote_file)} to {str(new_remote_file)} via local_storage.")

    @transform_exception("remote")
    async def delete(self,
                     remote_file: "RemoteFilePathType") -> None:
        remote_file = self.file_path_warp(remote_file)
        await to_thread(os.remove, str(remote_file))
        logger.success(f"Successfully deleted file {str(remote_file)} via local_storage.")

    async def list_files(self,
                         path: RemoteFilePathType,
                         pattern: Optional[str] = "*",
                         batch_max_files: Optional[int] = None,
                         valid_extensions: Optional[set[str]] = None) \
            -> AsyncGenerator[list[RemoteFilePathType], None]:
        local_path = self.file_path_warp(path)
        files = []
        for file in glob_local_files(local_path, pattern, valid_extensions):
            files.append(file)
            if batch_max_files is not None and len(files) == batch_max_files:
                yield files
                files = []
        if files:
            yield files

    async def update_metadata(self,
                              local_file_metadata: None,
                              remote_file_metadata: None) -> None:
        raise NotImplementedError