# Copyright (c) Alibaba, Inc. and its affiliates. import contextlib import os import tempfile from abc import ABCMeta, abstractmethod from pathlib import Path from typing import Generator, Union import requests from urllib.parse import urlparse def download_from_url(url): result = urlparse(url) file_path = None if result.scheme is not None and len(result.scheme) > 0: storage = HTTPStorage() # bytes data = storage.read(url) work_dir = tempfile.TemporaryDirectory().name if not os.path.exists(work_dir): os.makedirs(work_dir) file_path = os.path.join(work_dir, os.path.basename(url)) with open(file_path, "wb") as fb: fb.write(data) assert file_path is not None, f"failed to download: {url}" return file_path class Storage(metaclass=ABCMeta): """Abstract class of storage. All backends need to implement two apis: ``read()`` and ``read_text()``. ``read()`` reads the file as a byte stream and ``read_text()`` reads the file as texts. """ @abstractmethod def read(self, filepath: str): pass @abstractmethod def read_text(self, filepath: str): pass @abstractmethod def write(self, obj: bytes, filepath: Union[str, Path]) -> None: pass @abstractmethod def write_text( self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8" ) -> None: pass class LocalStorage(Storage): """Local hard disk storage""" def read(self, filepath: Union[str, Path]) -> bytes: """Read data from a given ``filepath`` with 'rb' mode. Args: filepath (str or Path): Path to read data. Returns: bytes: Expected bytes object. """ with open(filepath, "rb") as f: content = f.read() return content def read_text(self, filepath: Union[str, Path], encoding: str = "utf-8") -> str: """Read data from a given ``filepath`` with 'r' mode. Args: filepath (str or Path): Path to read data. encoding (str): The encoding format used to open the ``filepath``. Default: 'utf-8'. Returns: str: Expected text reading from ``filepath``. """ with open(filepath, "r", encoding=encoding) as f: value_buf = f.read() return value_buf def write(self, obj: bytes, filepath: Union[str, Path]) -> None: """Write data to a given ``filepath`` with 'wb' mode. Note: ``write`` will create a directory if the directory of ``filepath`` does not exist. Args: obj (bytes): Data to be written. filepath (str or Path): Path to write data. """ dirname = os.path.dirname(filepath) if dirname and not os.path.exists(dirname): os.makedirs(dirname, exist_ok=True) with open(filepath, "wb") as f: f.write(obj) def write_text( self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8" ) -> None: """Write data to a given ``filepath`` with 'w' mode. Note: ``write_text`` will create a directory if the directory of ``filepath`` does not exist. Args: obj (str): Data to be written. filepath (str or Path): Path to write data. encoding (str): The encoding format used to open the ``filepath``. Default: 'utf-8'. """ dirname = os.path.dirname(filepath) if dirname and not os.path.exists(dirname): os.makedirs(dirname, exist_ok=True) with open(filepath, "w", encoding=encoding) as f: f.write(obj) @contextlib.contextmanager def as_local_path( self, filepath: Union[str, Path] ) -> Generator[Union[str, Path], None, None]: """Only for unified API and do nothing.""" yield filepath class HTTPStorage(Storage): """HTTP and HTTPS storage.""" def read(self, url): # TODO @wenmeng.zwm add progress bar if file is too large r = requests.get(url) r.raise_for_status() return r.content def read_text(self, url): r = requests.get(url) r.raise_for_status() return r.text @contextlib.contextmanager def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]: """Download a file from ``filepath``. ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It can be called with ``with`` statement, and when exists from the ``with`` statement, the temporary path will be released. Args: filepath (str): Download a file from ``filepath``. Examples: >>> storage = HTTPStorage() >>> # After existing from the ``with`` clause, >>> # the path will be removed >>> with storage.get_local_path('http://path/to/file') as path: ... # do something here """ try: f = tempfile.NamedTemporaryFile(delete=False) f.write(self.read(filepath)) f.close() yield f.name finally: os.remove(f.name) def write(self, obj: bytes, url: Union[str, Path]) -> None: raise NotImplementedError("write is not supported by HTTP Storage") def write_text( self, obj: str, url: Union[str, Path], encoding: str = "utf-8" ) -> None: raise NotImplementedError("write_text is not supported by HTTP Storage") class OSSStorage(Storage): """OSS storage.""" def __init__(self, oss_config_file=None): # read from config file or env var raise NotImplementedError("OSSStorage.__init__ to be implemented in the future") def read(self, filepath): raise NotImplementedError("OSSStorage.read to be implemented in the future") def read_text(self, filepath, encoding="utf-8"): raise NotImplementedError( "OSSStorage.read_text to be implemented in the future" ) @contextlib.contextmanager def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]: """Download a file from ``filepath``. ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It can be called with ``with`` statement, and when exists from the ``with`` statement, the temporary path will be released. Args: filepath (str): Download a file from ``filepath``. Examples: >>> storage = OSSStorage() >>> # After existing from the ``with`` clause, >>> # the path will be removed >>> with storage.get_local_path('http://path/to/file') as path: ... # do something here """ try: f = tempfile.NamedTemporaryFile(delete=False) f.write(self.read(filepath)) f.close() yield f.name finally: os.remove(f.name) def write(self, obj: bytes, filepath: Union[str, Path]) -> None: raise NotImplementedError("OSSStorage.write to be implemented in the future") def write_text( self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8" ) -> None: raise NotImplementedError( "OSSStorage.write_text to be implemented in the future" ) G_STORAGES = {} class File(object): _prefix_to_storage: dict = { "oss": OSSStorage, "http": HTTPStorage, "https": HTTPStorage, "local": LocalStorage, } @staticmethod def _get_storage(uri): assert isinstance(uri, str), f"uri should be str type, but got {type(uri)}" if "://" not in uri: # local path storage_type = "local" else: prefix, _ = uri.split("://") storage_type = prefix assert storage_type in File._prefix_to_storage, ( f"Unsupported uri {uri}, valid prefixs: " f"{list(File._prefix_to_storage.keys())}" ) if storage_type not in G_STORAGES: G_STORAGES[storage_type] = File._prefix_to_storage[storage_type]() return G_STORAGES[storage_type] @staticmethod def read(uri: str) -> bytes: """Read data from a given ``filepath`` with 'rb' mode. Args: filepath (str or Path): Path to read data. Returns: bytes: Expected bytes object. """ storage = File._get_storage(uri) return storage.read(uri) @staticmethod def read_text(uri: Union[str, Path], encoding: str = "utf-8") -> str: """Read data from a given ``filepath`` with 'r' mode. Args: filepath (str or Path): Path to read data. encoding (str): The encoding format used to open the ``filepath``. Default: 'utf-8'. Returns: str: Expected text reading from ``filepath``. """ storage = File._get_storage(uri) return storage.read_text(uri) @staticmethod def write(obj: bytes, uri: Union[str, Path]) -> None: """Write data to a given ``filepath`` with 'wb' mode. Note: ``write`` will create a directory if the directory of ``filepath`` does not exist. Args: obj (bytes): Data to be written. filepath (str or Path): Path to write data. """ storage = File._get_storage(uri) return storage.write(obj, uri) @staticmethod def write_text(obj: str, uri: str, encoding: str = "utf-8") -> None: """Write data to a given ``filepath`` with 'w' mode. Note: ``write_text`` will create a directory if the directory of ``filepath`` does not exist. Args: obj (str): Data to be written. filepath (str or Path): Path to write data. encoding (str): The encoding format used to open the ``filepath``. Default: 'utf-8'. """ storage = File._get_storage(uri) return storage.write_text(obj, uri) @contextlib.contextmanager def as_local_path(uri: str) -> Generator[Union[str, Path], None, None]: """Only for unified API and do nothing.""" storage = File._get_storage(uri) with storage.as_local_path(uri) as local_path: yield local_path