Spaces:
Running
Running
import tempfile | |
import random | |
import requests | |
import json | |
import logging | |
from PIL import Image | |
from huggingface_hub import HfApi | |
from cozepy import Coze, TokenAuth | |
import hashlib | |
import os | |
from app.config import DATASET_ID, COZE_API_TOKEN, HUGGING_FACE_TOKEN | |
# 配置日志 | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') | |
logger = logging.getLogger(__name__) | |
# 初始化API客户端 | |
api = HfApi() | |
# coze = Coze(auth=TokenAuth(token=COZE_API_TOKEN), base_url="https://api.coze.cn") | |
def calculate_image_hash(Image: Image.Image) -> str: | |
""" | |
参数: | |
Image | |
返回: | |
str: 图片的MD5哈希字符串 | |
""" | |
return hashlib.md5(Image.tobytes()).hexdigest() | |
def get_image_description(image_url: str) -> str: | |
"""获取图片描述""" | |
pass | |
# logger.info(f"Get image description") | |
# workflow = coze.workflows.runs.create( | |
# workflow_id='7479742935953752091', | |
# parameters={ | |
# "image_url": image_url | |
# } | |
# ) | |
# logger.info(f"Image description: {workflow.data}") | |
# if (workflow.data): | |
# description = json.loads(workflow.data)['output'] | |
# return description | |
# else: | |
# return "" | |
def save_image_temp(image: Image.Image) -> str: | |
"""保存图片到临时文件""" | |
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_img: | |
image.save(temp_img.name, "PNG") | |
return temp_img.name | |
def upload_to_huggingface(temp_file_path: str) -> tuple: | |
"""上传图片到 HuggingFace""" | |
image_filename = f"image_{random.randint(1000, 9999)}.png" | |
file_path = f"images/{image_filename}" | |
logger.info(f"Uploading image to HuggingFace: {file_path}") | |
api.upload_file( | |
path_or_fileobj=temp_file_path, | |
path_in_repo=file_path, | |
repo_id=DATASET_ID, | |
token=HUGGING_FACE_TOKEN, | |
repo_type="dataset" | |
) | |
logger.info(f"Image uploaded successfully: {file_path}") | |
return file_path, image_filename | |
def upload_folder_to_huggingface(folder_path: str) -> None: | |
"""上传目录到 HuggingFace""" | |
logger.info(f"Uploading folder to HuggingFace: {folder_path}") | |
api.upload_folder( | |
folder_path=folder_path, | |
path_in_repo="images/", | |
repo_id=DATASET_ID, | |
token=HUGGING_FACE_TOKEN, | |
repo_type="dataset" | |
) | |
logger.info(f"Image uploaded successfully: {folder_path}") | |
return | |
def get_image_cdn_url(file_path: str) -> str: | |
"""获取图片CDN URL""" | |
image_url = f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}" | |
logger.info(f"Getting CDN URL for: {image_url}") | |
response = requests.head( | |
image_url, | |
allow_redirects=True, | |
timeout=10, | |
headers={ | |
'User-Agent': 'NekoAI', | |
} | |
) | |
image_cdn_url = response.url | |
logger.info(f"CDN URL: {image_cdn_url}") | |
return image_cdn_url | |
def format_image_url(file_path: str) -> str: | |
"""格式化图片URL,用于显示 | |
Args: | |
file_path (str): 图片路径,可以是本地路径或HuggingFace路径 | |
Returns: | |
str: 格式化后的URL | |
""" | |
# 如果是本地文件,直接返回完整路径 | |
if os.path.exists(file_path): | |
return os.path.abspath(file_path) | |
# 如果是HuggingFace路径,返回完整的URL | |
return f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}" | |
def generate_temp_image(temp_dir: str, image: Image.Image, image_filename: str) -> str: | |
"""根据图片和文件名生成临时图片文件 | |
Args: | |
temp_dir (str): 临时目录路径 | |
image (Image.Image): PIL图片对象 | |
image_filename (str): 图片文件名,格式为 image_XXXXXX.png | |
Returns: | |
str: 临时图片文件的路径 | |
Raises: | |
ValueError: 如果临时目录不存在或无法访问 | |
IOError: 如果保存图片失败 | |
""" | |
try: | |
# 确保临时目录存在 | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
logger.info(f"Created temporary directory: {temp_dir}") | |
# 构建临时文件路径 | |
temp_file_path = os.path.join(temp_dir, image_filename) | |
# 保存图片到临时文件 | |
image.save(temp_file_path, "PNG") | |
logger.info(f"Generated temporary image: {temp_file_path}") | |
return temp_file_path | |
except OSError as e: | |
logger.error(f"Failed to create temporary directory: {str(e)}") | |
raise ValueError(f"Failed to create temporary directory: {str(e)}") | |
except Exception as e: | |
logger.error(f"Failed to generate temporary image: {str(e)}") | |
raise IOError(f"Failed to generate temporary image: {str(e)}") |