Spaces:
Running
Running
from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import JSONResponse | |
import uvicorn | |
import os | |
from typing import List, Optional | |
import shutil | |
from pathlib import Path | |
import uuid | |
# Import AI functionality modules | |
from modules.document_processor import process_document | |
from modules.image_processor import process_image | |
# Create FastAPI app | |
app = FastAPI(title="AI Document Analysis API") | |
# Create upload directory if it doesn't exist | |
UPLOAD_DIR = Path("uploads") | |
UPLOAD_DIR.mkdir(exist_ok=True) | |
# Mount static files - updated path to be relative to the current directory | |
app.mount("/static", StaticFiles(directory="frontend"), name="static") | |
def read_root(): | |
return {"message": "AI Document Analysis API is running"} | |
async def analyze_document( | |
file: UploadFile = File(...), | |
analysis_type: str = Form(...) | |
): | |
# Validate file type | |
allowed_extensions = [".pdf", ".docx", ".pptx", ".xlsx", ".xls"] | |
file_ext = os.path.splitext(file.filename)[1].lower() | |
if file_ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"File type not supported. Allowed types: {', '.join(allowed_extensions)}") | |
# Create unique filename | |
unique_filename = f"{uuid.uuid4()}{file_ext}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Process document based on analysis type | |
if analysis_type == "summarize": | |
result = process_document(str(file_path), "summarize") | |
else: | |
raise HTTPException(status_code=400, detail="Invalid analysis type") | |
# Return results | |
return { | |
"filename": file.filename, | |
"analysis_type": analysis_type, | |
"result": result | |
} | |
except Exception as e: | |
# Clean up file on error | |
if file_path.exists(): | |
os.remove(file_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def analyze_image( | |
file: UploadFile = File(...), | |
analysis_type: str = Form(...) | |
): | |
# Validate file type | |
allowed_extensions = [".jpg", ".jpeg", ".png"] | |
file_ext = os.path.splitext(file.filename)[1].lower() | |
if file_ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"File type not supported. Allowed types: {', '.join(allowed_extensions)}") | |
# Create unique filename | |
unique_filename = f"{uuid.uuid4()}{file_ext}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Process image based on analysis type | |
if analysis_type == "caption": | |
result = process_image(str(file_path), "caption") | |
else: | |
raise HTTPException(status_code=400, detail="Invalid analysis type") | |
# Return results | |
return { | |
"filename": file.filename, | |
"analysis_type": analysis_type, | |
"result": result | |
} | |
except Exception as e: | |
# Clean up file on error | |
if file_path.exists(): | |
os.remove(file_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
# Add health check endpoint | |
def health_check(): | |
return {"status": "healthy"} | |
if __name__ == "__main__": | |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |