Spaces:
Running
Running
File size: 8,052 Bytes
21db53c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
from datetime import datetime
from io import BytesIO
from pathlib import PurePath
from typing import Annotated
from uuid import UUID
from PIL import Image, UnidentifiedImageError
from fastapi import APIRouter, Depends, HTTPException, params, UploadFile, File
from loguru import logger
from app.Models.api_models.admin_api_model import ImageOptUpdateModel, DuplicateValidationModel
from app.Models.api_models.admin_query_params import UploadImageModel
from app.Models.api_response.admin_api_response import ServerInfoResponse, ImageUploadResponse, \
DuplicateValidationResponse
from app.Models.api_response.base import NekoProtocol
from app.Models.errors import PointDuplicateError
from app.Models.img_data import ImageData
from app.Services.authentication import force_admin_token_verify
from app.Services.provider import ServiceProvider
from app.Services.vector_db_context import PointNotFoundError
from app.config import config
from app.util.generate_uuid import generate_uuid_from_sha1
from app.util.local_file_utility import VALID_IMAGE_EXTENSIONS
admin_router = APIRouter(dependencies=[Depends(force_admin_token_verify)], tags=["Admin"])
services: ServiceProvider | None = None
@admin_router.delete("/delete/{image_id}",
description="Delete image with the given id from database. "
"If the image is a local image, it will be moved to `/static/_deleted` folder.")
async def delete_image(
image_id: Annotated[UUID, params.Path(description="The id of the image you want to delete.")]) -> NekoProtocol:
try:
point = await services.db_context.retrieve_by_id(str(image_id))
except PointNotFoundError as ex:
raise HTTPException(404, "Cannot find the image with the given ID.") from ex
await services.db_context.deleteItems([str(point.id)])
logger.success("Image {} deleted from database.", point.id)
if config.storage.method.enabled: # local image
if point.local:
image_files = [itm[0] async for itm in
services.storage_service.active_storage.list_files("", f"{point.id}.*")]
assert len(image_files) <= 1
if not image_files:
logger.warning("Image {} is a local image but not found in static folder.", point.id)
else:
await services.storage_service.active_storage.move(image_files[0], f"_deleted/{image_files[0].name}")
logger.success("Image {} removed.", image_files[0].name)
if point.thumbnail_url is not None and (point.local or point.local_thumbnail):
thumbnail_file = PurePath(f"thumbnails/{point.id}.webp")
if await services.storage_service.active_storage.is_exist(thumbnail_file):
await services.storage_service.active_storage.delete(thumbnail_file)
logger.success("Thumbnail {} removed.", thumbnail_file.name)
else:
logger.warning("Thumbnail {} not found.", thumbnail_file.name)
return NekoProtocol(message="Image deleted.")
@admin_router.put("/update_opt/{image_id}", description="Update a image's optional information")
async def update_image(image_id: Annotated[UUID, params.Path(description="The id of the image you want to delete.")],
model: ImageOptUpdateModel) -> NekoProtocol:
if model.empty():
raise HTTPException(422, "Nothing to update.")
try:
point = await services.db_context.retrieve_by_id(str(image_id))
except PointNotFoundError as ex:
raise HTTPException(404, "Cannot find the image with the given ID.") from ex
if model.thumbnail_url is not None:
if point.local or point.local_thumbnail:
raise HTTPException(422, "Cannot change the thumbnail URL of a local image.")
point.thumbnail_url = model.thumbnail_url
if model.url is not None:
if point.local:
raise HTTPException(422, "Cannot change the URL of a local image.")
point.url = model.url
if model.starred is not None:
point.starred = model.starred
if model.categories is not None:
point.categories = model.categories
await services.db_context.updatePayload(point)
logger.success("Image {} updated.", point.id)
return NekoProtocol(message="Image updated.")
IMAGE_MIMES = {
"image/jpeg": "jpeg",
"image/png": "png",
"image/webp": "webp",
"image/gif": "gif",
}
@admin_router.post("/upload",
description="Upload image to server. The image will be indexed and stored in the database. If "
"local is set to true, the image will be uploaded to local storage.")
async def upload_image(image_file: Annotated[UploadFile, File(description="The image to be uploaded.")],
model: Annotated[UploadImageModel, Depends()]) -> ImageUploadResponse:
# generate an ID for the image
img_type = None
if image_file.content_type.lower() in IMAGE_MIMES:
img_type = IMAGE_MIMES[image_file.content_type.lower()]
elif image_file.filename:
extension = PurePath(image_file.filename).suffix.lower()
if extension in VALID_IMAGE_EXTENSIONS:
img_type = extension[1:]
if not img_type:
logger.warning("Failed to infer image format of the uploaded image. Content Type: {}, Filename: {}",
image_file.content_type, image_file.filename)
raise HTTPException(415, "Unsupported image format.")
img_bytes = await image_file.read()
try:
img_id = await services.upload_service.assign_image_id(img_bytes)
except PointDuplicateError as ex:
raise HTTPException(409,
f"The uploaded point is already contained in the database! entity id: {ex.entity_id}") \
from ex
try:
image = Image.open(BytesIO(img_bytes))
image.verify()
image.close()
except UnidentifiedImageError as ex:
logger.warning("Invalid image file from upload request. id: {}", img_id)
raise HTTPException(422, "Cannot open the image file.") from ex
image_data = ImageData(id=img_id,
url=model.url,
thumbnail_url=model.thumbnail_url,
local=model.local,
categories=model.categories,
starred=model.starred,
format=img_type,
index_date=datetime.now())
await services.upload_service.queue_upload_image(image_data, img_bytes, model.skip_ocr, model.local_thumbnail)
return ImageUploadResponse(message="OK. Image added to upload queue.", image_id=img_id)
@admin_router.get("/server_info", description="Get server information")
async def server_info() -> ServerInfoResponse:
return ServerInfoResponse(message="Successfully get server information!",
image_count=await services.db_context.get_counts(exact=True),
index_queue_length=services.upload_service.get_queue_size())
@admin_router.post("/duplication_validate",
description="Check if an image exists in the server by its SHA1 hash. If the image exists, "
"the image ID will be returned.\n"
"This is helpful for checking if an image is already in the server without "
"uploading the image.")
async def duplication_validate(model: DuplicateValidationModel) -> DuplicateValidationResponse:
ids = [generate_uuid_from_sha1(t) for t in model.hashes]
valid_ids = await services.db_context.validate_ids([str(t) for t in ids])
exists_matrix = [str(t) in valid_ids or t in services.upload_service.uploading_ids for t in ids]
return DuplicateValidationResponse(
exists=exists_matrix,
entity_ids=[(str(t) if exists else None) for (t, exists) in zip(ids, exists_matrix)],
message="Validation completed.")
|