from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles import tempfile import os import json import traceback from datetime import datetime from typing import Dict, List, Any, Optional import shutil from convert_pdf import convert_pdf # Create output directory if it doesn't exist os.makedirs("output", exist_ok=True) os.makedirs("output/images", exist_ok=True) # Application metadata app_description = """ # MinerU PDF Processor API This API provides PDF processing capabilities using MinerU's magic-pdf library. It extracts text content, tables, and generates markdown from PDF documents. ## Features: - PDF text extraction - Markdown conversion - Layout analysis """ app = FastAPI( title="MinerU PDF API", description=app_description, version="1.0.0", contact={ "name": "PDF Converter Service", }, ) # Add CORS middleware to allow cross-origin requests app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins allow_credentials=True, allow_methods=["*"], # Allow all methods allow_headers=["*"], # Allow all headers ) # Mount the output directory as static files app.mount("/output", StaticFiles(directory="output"), name="output") # Health check endpoint @app.get("/health", tags=["Health"]) async def health_check() -> Dict[str, Any]: """ Health check endpoint to verify the service is running. Returns the service status and current time. """ return { "status": "healthy", "timestamp": datetime.now().isoformat(), "service": "mineru-pdf-processor" } @app.post("/convert", tags=["PDF Processing"]) async def convert(file: UploadFile = File(...)) -> Dict[str, Any]: """ Convert a PDF file to markdown using the magic-pdf library. Parameters: file: The PDF file to process Returns: A JSON object containing the conversion result and links to output files """ if not file.filename or not file.filename.lower().endswith('.pdf'): raise HTTPException(status_code=400, detail="Invalid file. Please upload a PDF file.") content = await file.read() temp_pdf_path = None try: # Save the uploaded PDF to a temporary file with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: temp_pdf.write(content) temp_pdf_path = temp_pdf.name # Clear previous output files for item in os.listdir("output/images"): os.remove(os.path.join("output/images", item)) for item in os.listdir("output"): if os.path.isfile(os.path.join("output", item)): os.remove(os.path.join("output", item)) # Process the PDF using convert_pdf function md_content = convert_pdf(temp_pdf_path) # Get the base name of the processed file filename_without_ext = os.path.splitext(os.path.basename(temp_pdf_path))[0] # Gather the output files output_files = {} # Markdown file md_path = os.path.join("output", f"{filename_without_ext}.md") if os.path.exists(md_path): output_files["markdown"] = f"/output/{filename_without_ext}.md" # Layout PDF layout_path = os.path.join("output", f"{filename_without_ext}_layout.pdf") if os.path.exists(layout_path): output_files["layout"] = f"/output/{filename_without_ext}_layout.pdf" # Spans PDF spans_path = os.path.join("output", f"{filename_without_ext}_spans.pdf") if os.path.exists(spans_path): output_files["spans"] = f"/output/{filename_without_ext}_spans.pdf" # Model PDF model_path = os.path.join("output", f"{filename_without_ext}_model.pdf") if os.path.exists(model_path): output_files["model"] = f"/output/{filename_without_ext}_model.pdf" # Content list JSON content_list_path = os.path.join("output", f"{filename_without_ext}_content_list.json") if os.path.exists(content_list_path): output_files["content_list"] = f"/output/{filename_without_ext}_content_list.json" # Middle JSON middle_json_path = os.path.join("output", f"{filename_without_ext}_middle.json") if os.path.exists(middle_json_path): output_files["middle_json"] = f"/output/{filename_without_ext}_middle.json" return { "filename": file.filename, "status": "success", "markdown_content": md_content, "output_files": output_files } except Exception as e: error_detail = str(e) error_trace = traceback.format_exc() # Log the error print(f"Error processing PDF: {error_detail}") print(error_trace) return JSONResponse( status_code=500, content={ "error": "Error processing PDF", "detail": error_detail, "filename": file.filename if file and hasattr(file, 'filename') else None } ) finally: # Clean up the temporary file if temp_pdf_path and os.path.exists(temp_pdf_path): try: os.unlink(temp_pdf_path) except Exception: pass @app.get("/files/{filename}", tags=["Files"]) async def get_file(filename: str): """ Get a file from the output directory. Parameters: filename: The name of the file to retrieve Returns: The requested file """ file_path = os.path.join("output", filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail=f"File {filename} not found") return FileResponse(path=file_path) if __name__ == "__main__": import uvicorn uvicorn.run("api:app", host="0.0.0.0", port=7860, reload=False)