Spaces:
Running
Running
from fastapi import FastAPI, File, UploadFile, Form | |
from typing import List | |
import pdfplumber | |
import pytesseract | |
from PIL import Image | |
import easyocr | |
import docx | |
import openpyxl | |
from pptx import Presentation | |
from transformers import pipeline | |
import io | |
app = FastAPI() | |
# Load Hugging Face models | |
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2") | |
vqa_pipeline = pipeline("image-to-text", model="Salesforce/blip-vqa-base") # For images | |
def extract_text_from_pdf(pdf_file): | |
text = "" | |
with pdfplumber.open(pdf_file) as pdf: | |
for page in pdf.pages: | |
text += page.extract_text() + "\n" | |
return text.strip() | |
def extract_text_from_docx(docx_file): | |
doc = docx.Document(docx_file) | |
return "\n".join([para.text for para in doc.paragraphs]) | |
def extract_text_from_pptx(pptx_file): | |
ppt = Presentation(pptx_file) | |
text = [] | |
for slide in ppt.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text.append(shape.text) | |
return "\n".join(text) | |
def extract_text_from_excel(excel_file): | |
wb = openpyxl.load_workbook(excel_file) | |
text = [] | |
for sheet in wb.worksheets: | |
for row in sheet.iter_rows(values_only=True): | |
text.append(" ".join(map(str, row))) | |
return "\n".join(text) | |
def extract_text_from_image(image_file): | |
reader = easyocr.Reader(["en"]) | |
result = reader.readtext(image_file) | |
return " ".join([res[1] for res in result]) | |
async def qa_document(file: UploadFile = File(...), question: str = Form(...)): | |
file_ext = file.filename.split(".")[-1].lower() | |
if file_ext == "pdf": | |
text = extract_text_from_pdf(io.BytesIO(await file.read())) | |
elif file_ext == "docx": | |
text = extract_text_from_docx(io.BytesIO(await file.read())) | |
elif file_ext == "pptx": | |
text = extract_text_from_pptx(io.BytesIO(await file.read())) | |
elif file_ext == "xlsx": | |
text = extract_text_from_excel(io.BytesIO(await file.read())) | |
else: | |
return {"error": "Unsupported file format!"} | |
if not text: | |
return {"error": "No text extracted from the document."} | |
response = qa_pipeline(question=question, context=text) | |
return {"question": question, "answer": response["answer"]} | |
async def qa_image(file: UploadFile = File(...), question: str = Form(...)): | |
image = Image.open(io.BytesIO(await file.read())) | |
image_text = extract_text_from_image(image) | |
if not image_text: | |
return {"error": "No text detected in the image."} | |
response = qa_pipeline(question=question, context=image_text) | |
return {"question": question, "answer": response["answer"]} | |