Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import time | |
import re | |
import logging | |
from datetime import datetime | |
import gradio as gr | |
import google.generativeai as genai | |
from PyPDF2 import PdfReader | |
from tika import parser | |
# Configure logging | |
tmp_log = "pdf_processor_log.txt" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler(tmp_log) | |
] | |
) | |
logger = logging.getLogger("pdf_processor") | |
# Attempt to import Unstructured.io partitioning | |
try: | |
from unstructured.partition.pdf import partition_pdf | |
UNSTRUCTURED_AVAILABLE = True | |
except ImportError: | |
UNSTRUCTURED_AVAILABLE = False | |
logger.warning("unstructured.partition.pdf not available; skipping that extraction method") | |
# Load API key from environment | |
API_KEY = os.getenv("GOOGLE_API_KEY", None) | |
if not API_KEY: | |
logger.warning("GOOGLE_API_KEY not set in environment.") | |
else: | |
genai.configure(api_key=API_KEY) | |
# Globals to store state | |
EXTRACTED_TEXT = "" | |
PDF_SECTIONS = [] | |
EXTRACTION_METHOD = "" | |
# --- Extraction Functions --- | |
def extract_text_with_unstructured(pdf_path): | |
logger.info("Extracting via Unstructured.io...") | |
elements = partition_pdf(filename=pdf_path, extract_images_in_pdf=False) | |
sections, current = [], {"title":"Introduction","content":""} | |
for e in elements: | |
if hasattr(e, "text") and (t := e.text.strip()): | |
if len(t)<80 and (t.isupper() or t.endswith(':') or re.match(r'^[0-9]+\.?\s+', t)): | |
if current["content"]: sections.append(current) | |
current = {"title":t, "content":""} | |
else: | |
current["content"] += t + "\n\n" | |
if current["content"]: sections.append(current) | |
return sections | |
def extract_text_with_pypdf(pdf_path): | |
logger.info("Extracting via PyPDF2...") | |
reader = PdfReader(pdf_path) | |
full = "" | |
for i,p in enumerate(reader.pages,1): | |
if (txt := p.extract_text()): full += f"\n\n--- Page {i} ---\n\n{txt}" | |
parts = re.split(r"\n\s*([A-Z][A-Z\s]+:?|[0-9]+\.\s+[A-Z].*?)\s*\n", full) | |
if len(parts)>1: | |
return [{"title":parts[i].strip(),"content":parts[i+1].strip()} for i in range(1,len(parts),2)] | |
# fallback to single section | |
return [{"title":"Document","content":full}] | |
def extract_text_with_tika(pdf_path): | |
logger.info("Extracting via Tika...") | |
parsed = parser.from_file(pdf_path) | |
lines = parsed.get("content","").split("\n") | |
sections, current = [], {"title":"Introduction","content":""} | |
for ln in lines: | |
ln = ln.strip() | |
if not ln: continue | |
if len(ln)<80 and (ln.isupper() or ln.endswith(':') or re.match(r'^[0-9]+\.?\s+[A-Z]', ln)): | |
if current["content"]: sections.append(current) | |
current = {"title":ln, "content":""} | |
else: | |
current["content"] += ln + "\n\n" | |
if current["content"]: sections.append(current) | |
return sections | |
# --- Gemini API calls --- | |
def generate_greg_brockman_summary(content): | |
model = genai.GenerativeModel('gemini-1.5-pro') | |
prompt = f""" | |
You are an expert document analyst... | |
{content} | |
""" | |
try: | |
resp = model.generate_content(prompt) | |
return resp.text, None | |
except Exception as e: | |
logger.error(e) | |
return None, str(e) | |
def answer_question_about_pdf(content, question): | |
model = genai.GenerativeModel('gemini-1.5-pro') | |
prompt = f""" | |
You are a precise document analysis assistant... | |
DOCUMENT CONTENT: | |
{content} | |
QUESTION: {question} | |
""" | |
try: | |
resp = model.generate_content(prompt) | |
return resp.text, None | |
except Exception as e: | |
logger.error(e) | |
return None, str(e) | |
# --- Processing & Q&A --- | |
def process_pdf(pdf_file, progress=gr.Progress()): | |
global EXTRACTED_TEXT, PDF_SECTIONS, EXTRACTION_METHOD | |
if not API_KEY: | |
return None, None, "β Set GOOGLE_API_KEY in settings.", "" | |
if pdf_file is None: | |
return None, None, "β No file uploaded.", "" | |
tmp = tempfile.gettempdir() | |
path = os.path.join(tmp, pdf_file.name) | |
with open(path, 'wb') as f: f.write(pdf_file.read()) | |
methods = [] | |
if UNSTRUCTURED_AVAILABLE: | |
methods.append(("unstructured", extract_text_with_unstructured)) | |
methods.extend([ | |
("pypdf", extract_text_with_pypdf), | |
("tika", extract_text_with_tika) | |
]) | |
with gr.Tab("Ask Questions"): | |
question = gr.Textbox(label="Question", lines=2) | |
ask_btn = gr.Button("Ask") | |
answer = gr.Textbox(label="Answer", lines=10) | |
ask_btn.click(ask_question, inputs=[question], outputs=[answer]) | |
with gr.Tab("System Log"): | |
refresh = gr.Button("Refresh Log") | |
syslog = gr.Textbox(label="System Log", lines=15) | |
refresh.click(view_log, inputs=None, outputs=[syslog]) | |
with gr.Row(): | |
save_sum_btn = gr.Button("Save Summary") | |
save_sum_status = gr.Markdown("") | |
save_sum_btn.click(save_summary, inputs=[summary_out], outputs=[save_sum_status]) | |
with gr.Row(): | |
save_qa_btn = gr.Button("Save Q&A") | |
save_qa_status = gr.Markdown("") | |
save_qa_btn.click(save_qa, inputs=[question, answer], outputs=[save_qa_status]) | |
if __name__ == "__main__": | |
# For Hugging Face Spaces, set `server_name="0.0.0.0"` if needed | |
app.launch() |