Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, UploadFile, File, HTTPException | |
from fastapi.responses import JSONResponse, FileResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.staticfiles import StaticFiles | |
import tempfile | |
import os | |
import json | |
import traceback | |
from datetime import datetime | |
from typing import Dict, List, Any, Optional | |
import shutil | |
from convert_pdf import convert_pdf | |
# Create output directory if it doesn't exist | |
os.makedirs("output", exist_ok=True) | |
os.makedirs("output/images", exist_ok=True) | |
# Application metadata | |
app_description = """ | |
# MinerU PDF Processor API | |
This API provides PDF processing capabilities using MinerU's magic-pdf library. | |
It extracts text content, tables, and generates markdown from PDF documents. | |
## Features: | |
- PDF text extraction | |
- Markdown conversion | |
- Layout analysis | |
""" | |
app = FastAPI( | |
title="MinerU PDF API", | |
description=app_description, | |
version="1.0.0", | |
contact={ | |
"name": "PDF Converter Service", | |
}, | |
) | |
# Add CORS middleware to allow cross-origin requests | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Allow all origins | |
allow_credentials=True, | |
allow_methods=["*"], # Allow all methods | |
allow_headers=["*"], # Allow all headers | |
) | |
# Mount the output directory as static files | |
app.mount("/output", StaticFiles(directory="output"), name="output") | |
# Health check endpoint | |
async def health_check() -> Dict[str, Any]: | |
""" | |
Health check endpoint to verify the service is running. | |
Returns the service status and current time. | |
""" | |
return { | |
"status": "healthy", | |
"timestamp": datetime.now().isoformat(), | |
"service": "mineru-pdf-processor" | |
} | |
async def convert(file: UploadFile = File(...)) -> Dict[str, Any]: | |
""" | |
Convert a PDF file to markdown using the magic-pdf library. | |
Parameters: | |
file: The PDF file to process | |
Returns: | |
A JSON object containing the conversion result and links to output files | |
""" | |
if not file.filename or not file.filename.lower().endswith('.pdf'): | |
raise HTTPException(status_code=400, detail="Invalid file. Please upload a PDF file.") | |
content = await file.read() | |
temp_pdf_path = None | |
try: | |
# Save the uploaded PDF to a temporary file | |
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: | |
temp_pdf.write(content) | |
temp_pdf_path = temp_pdf.name | |
# Clear previous output files | |
for item in os.listdir("output/images"): | |
os.remove(os.path.join("output/images", item)) | |
for item in os.listdir("output"): | |
if os.path.isfile(os.path.join("output", item)): | |
os.remove(os.path.join("output", item)) | |
# Process the PDF using convert_pdf function | |
md_content = convert_pdf(temp_pdf_path) | |
# Get the base name of the processed file | |
filename_without_ext = os.path.splitext(os.path.basename(temp_pdf_path))[0] | |
# Gather the output files | |
output_files = {} | |
# Markdown file | |
md_path = os.path.join("output", f"{filename_without_ext}.md") | |
if os.path.exists(md_path): | |
output_files["markdown"] = f"/output/{filename_without_ext}.md" | |
# Layout PDF | |
layout_path = os.path.join("output", f"{filename_without_ext}_layout.pdf") | |
if os.path.exists(layout_path): | |
output_files["layout"] = f"/output/{filename_without_ext}_layout.pdf" | |
# Spans PDF | |
spans_path = os.path.join("output", f"{filename_without_ext}_spans.pdf") | |
if os.path.exists(spans_path): | |
output_files["spans"] = f"/output/{filename_without_ext}_spans.pdf" | |
# Model PDF | |
model_path = os.path.join("output", f"{filename_without_ext}_model.pdf") | |
if os.path.exists(model_path): | |
output_files["model"] = f"/output/{filename_without_ext}_model.pdf" | |
# Content list JSON | |
content_list_path = os.path.join("output", f"{filename_without_ext}_content_list.json") | |
if os.path.exists(content_list_path): | |
output_files["content_list"] = f"/output/{filename_without_ext}_content_list.json" | |
# Middle JSON | |
middle_json_path = os.path.join("output", f"{filename_without_ext}_middle.json") | |
if os.path.exists(middle_json_path): | |
output_files["middle_json"] = f"/output/{filename_without_ext}_middle.json" | |
return { | |
"filename": file.filename, | |
"status": "success", | |
"markdown_content": md_content, | |
"output_files": output_files | |
} | |
except Exception as e: | |
error_detail = str(e) | |
error_trace = traceback.format_exc() | |
# Log the error | |
print(f"Error processing PDF: {error_detail}") | |
print(error_trace) | |
return JSONResponse( | |
status_code=500, | |
content={ | |
"error": "Error processing PDF", | |
"detail": error_detail, | |
"filename": file.filename if file and hasattr(file, 'filename') else None | |
} | |
) | |
finally: | |
# Clean up the temporary file | |
if temp_pdf_path and os.path.exists(temp_pdf_path): | |
try: | |
os.unlink(temp_pdf_path) | |
except Exception: | |
pass | |
async def get_file(filename: str): | |
""" | |
Get a file from the output directory. | |
Parameters: | |
filename: The name of the file to retrieve | |
Returns: | |
The requested file | |
""" | |
file_path = os.path.join("output", filename) | |
if not os.path.exists(file_path): | |
raise HTTPException(status_code=404, detail=f"File {filename} not found") | |
return FileResponse(path=file_path) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("api:app", host="0.0.0.0", port=7860, reload=False) |