Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import time | |
import re | |
import logging | |
from datetime import datetime | |
import gradio as gr | |
import google.generativeai as genai | |
from PyPDF2 import PdfReader | |
from tika import parser | |
from unstructured.partition.pdf import partition_pdf | |
# Configure logging | |
tmp_log = "pdf_processor_log.txt" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler(tmp_log) | |
] | |
) | |
logger = logging.getLogger("pdf_processor") | |
# Load API key from environment | |
API_KEY = os.getenv("GOOGLE_API_KEY", None) | |
if not API_KEY: | |
logger.warning("GOOGLE_API_KEY not set in environment.") | |
else: | |
genai.configure(api_key=API_KEY) | |
# Globals to store state | |
EXTRACTED_TEXT = "" | |
PDF_SECTIONS = [] | |
EXTRACTION_METHOD = "" | |
# --- Extraction Functions --- | |
def extract_text_with_unstructured(pdf_path): | |
logger.info("Extracting via Unstructured.io...") | |
elements = partition_pdf(filename=pdf_path, extract_images_in_pdf=False) | |
sections, current = [], {"title":"Introduction","content":""} | |
for e in elements: | |
if hasattr(e, "text") and (t := e.text.strip()): | |
if len(t)<80 and (t.isupper() or t.endswith(':') or re.match(r'^[0-9]+\.?\s+', t)): | |
if current["content"]: sections.append(current) | |
current = {"title":t, "content":""} | |
else: | |
current["content"] += t + "\n\n" | |
if current["content"]: sections.append(current) | |
return sections | |
def extract_text_with_pypdf(pdf_path): | |
logger.info("Extracting via PyPDF2...") | |
reader = PdfReader(pdf_path) | |
full = "" | |
for i,p in enumerate(reader.pages,1): | |
if (txt := p.extract_text()): full += f"\n\n--- Page {i} ---\n\n{txt}" | |
parts = re.split(r"\n\s*([A-Z][A-Z\s]+:?|[0-9]+\.\s+[A-Z].*?)\s*\n", full) | |
if len(parts)>1: | |
return [{"title":parts[i].strip(),"content":parts[i+1].strip()} for i in range(1,len(parts),2)] | |
# fallback to single section | |
return [{"title":"Document","content":full}] | |
def extract_text_with_tika(pdf_path): | |
logger.info("Extracting via Tika...") | |
parsed = parser.from_file(pdf_path) | |
lines = parsed.get("content","").split("\n") | |
sections, current = [], {"title":"Introduction","content":""} | |
for ln in lines: | |
ln = ln.strip() | |
if not ln: continue | |
if len(ln)<80 and (ln.isupper() or ln.endswith(':') or re.match(r'^[0-9]+\.?\s+[A-Z]', ln)): | |
if current["content"]: sections.append(current) | |
current = {"title":ln, "content":""} | |
else: | |
current["content"] += ln + "\n\n" | |
if current["content"]: sections.append(current) | |
return sections | |
# --- Gemini API calls --- | |
def generate_greg_brockman_summary(content): | |
model = genai.GenerativeModel('gemini-1.5-pro') | |
prompt = f""" | |
You are an expert document analyst... | |
{content} | |
""" | |
try: | |
resp = model.generate_content(prompt) | |
return resp.text, None | |
except Exception as e: | |
logger.error(e) | |
return None, str(e) | |
def answer_question_about_pdf(content, question): | |
model = genai.GenerativeModel('gemini-1.5-pro') | |
prompt = f""" | |
You are a precise document analysis assistant... | |
DOCUMENT CONTENT: | |
{content} | |
QUESTION: {question} | |
""" | |
try: | |
resp = model.generate_content(prompt) | |
return resp.text, None | |
except Exception as e: | |
logger.error(e) | |
return None, str(e) | |
# --- Processing & Q&A --- | |
def process_pdf(pdf_file, progress=gr.Progress()): | |
global EXTRACTED_TEXT, PDF_SECTIONS, EXTRACTION_METHOD | |
if not API_KEY: | |
return None, None, "β Set GOOGLE_API_KEY in settings.", "" | |
if pdf_file is None: | |
return None, None, "β No file uploaded.", "" | |
tmp = tempfile.gettempdir() | |
path = os.path.join(tmp, pdf_file.name) | |
with open(path, 'wb') as f: f.write(pdf_file.read()) | |
methods = [("unstructured", extract_text_with_unstructured), | |
("pypdf", extract_text_with_pypdf), | |
("tika", extract_text_with_tika)] | |
for name, fn in methods: | |
try: | |
secs = fn(path) | |
if secs: | |
EXTRACTION_METHOD = name | |
PDF_SECTIONS = secs | |
break | |
except: | |
continue | |
if not PDF_SECTIONS: | |
return None, None, "β Extraction failed.", "" | |
combined, struct = "", "" | |
for i,sec in enumerate(PDF_SECTIONS,1): | |
struct += f"{i}. {sec['title']}\n" | |
block = f"## {sec['title']}\n{sec['content']}\n\n" | |
combined += block if len(combined+block)<30000 else f"## {sec['title']}\n[Truncated]\n\n" | |
EXTRACTED_TEXT = combined | |
summary, err = generate_greg_brockman_summary(combined) | |
if err: | |
return None, struct, f"β {err}", combined | |
return summary, struct, "β Done", f"Used {EXTRACTION_METHOD}, {len(PDF_SECTIONS)} sections" | |
def ask_question(question): | |
if not API_KEY: return "β Set GOOGLE_API_KEY." | |
if not EXTRACTED_TEXT: return "β Process a PDF first." | |
if not question.strip(): return "β Enter a question." | |
ans, err = answer_question_about_pdf(EXTRACTED_TEXT, question) | |
return ans if not err else f"β {err}" | |
def view_log(): | |
try: | |
return open(tmp_log).read() | |
except: | |
return "Error reading log." | |
def save_summary(summary): | |
if not summary: return "β No summary." | |
fn = f"summary_{datetime.now():%Y%m%d_%H%M%S}.txt" | |
open(fn, 'w', encoding='utf-8').write(summary) | |
return f"β Saved to {fn}" | |
def save_qa(question, answer): | |
if not question or not answer: return "β Incomplete Q&A." | |
fn = f"qa_{datetime.now():%Y%m%d_%H%M%S}.txt" | |
with open(fn,'w',encoding='utf-8') as f: | |
f.write(f"Q: {question}\n\nA: {answer}") | |
return f"β Saved to {fn}" | |
# --- Gradio UI --- | |
with gr.Blocks(title="PDF Analyzer with Gemini API") as app: | |
gr.Markdown("# π PDF Analyzer with Gemini API") | |
gr.Markdown("Upload a PDF, get a summary, ask questions.") | |
with gr.Tab("PDF Processing"): | |
pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"], type="binary") | |
process_btn = gr.Button("Process PDF") | |
summary_out = gr.Textbox(label="Summary", lines=15) | |
struct_out = gr.Textbox(label="Structure", lines=8) | |
status = gr.Markdown("") | |
log_out = gr.Textbox(label="Log", lines=8) | |
process_btn.click(process_pdf, inputs=[pdf_file], | |
outputs=[summary_out, struct_out, status, log_out]) | |
with gr.Tab("Ask Questions"): | |
question = gr.Textbox(label="Question", lines=2) | |
ask_btn = gr.Button("Ask") | |
answer = gr.Textbox(label="Answer", lines=10) | |
ask_btn.click(ask_question, inputs=[question], outputs=[answer]) | |
with gr.Tab("System Log"): | |
refresh = gr.Button("Refresh Log") | |
syslog = gr.Textbox(label="System Log", lines=15) | |
refresh.click(view_log, inputs=None, outputs=[syslog]) | |
with gr.Row(): | |
save_sum_btn = gr.Button("Save Summary") | |
save_sum_status = gr.Markdown("") | |
save_sum_btn.click(save_summary, inputs=[summary_out], outputs=[save_sum_status]) | |
with gr.Row(): | |
save_qa_btn = gr.Button("Save Q&A") | |
save_qa_status = gr.Markdown("") | |
save_qa_btn.click(save_qa, inputs=[question, answer], outputs=[save_qa_status]) | |
if __name__ == "__main__": | |
# For Hugging Face Spaces, set `server_name="0.0.0.0"` if needed | |
app.launch() |