Omarrran's picture
Update app.py
e657d8c verified
raw
history blame
5.32 kB
import os
import tempfile
import time
import re
import logging
from datetime import datetime
import gradio as gr
import google.generativeai as genai
from PyPDF2 import PdfReader
from tika import parser
# Configure logging
tmp_log = "pdf_processor_log.txt"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(tmp_log)
]
)
logger = logging.getLogger("pdf_processor")
# Attempt to import Unstructured.io partitioning
try:
from unstructured.partition.pdf import partition_pdf
UNSTRUCTURED_AVAILABLE = True
except ImportError:
UNSTRUCTURED_AVAILABLE = False
logger.warning("unstructured.partition.pdf not available; skipping that extraction method")
# Load API key from environment
API_KEY = os.getenv("GOOGLE_API_KEY", None)
if not API_KEY:
logger.warning("GOOGLE_API_KEY not set in environment.")
else:
genai.configure(api_key=API_KEY)
# Globals to store state
EXTRACTED_TEXT = ""
PDF_SECTIONS = []
EXTRACTION_METHOD = ""
# --- Extraction Functions ---
def extract_text_with_unstructured(pdf_path):
logger.info("Extracting via Unstructured.io...")
elements = partition_pdf(filename=pdf_path, extract_images_in_pdf=False)
sections, current = [], {"title":"Introduction","content":""}
for e in elements:
if hasattr(e, "text") and (t := e.text.strip()):
if len(t)<80 and (t.isupper() or t.endswith(':') or re.match(r'^[0-9]+\.?\s+', t)):
if current["content"]: sections.append(current)
current = {"title":t, "content":""}
else:
current["content"] += t + "\n\n"
if current["content"]: sections.append(current)
return sections
def extract_text_with_pypdf(pdf_path):
logger.info("Extracting via PyPDF2...")
reader = PdfReader(pdf_path)
full = ""
for i,p in enumerate(reader.pages,1):
if (txt := p.extract_text()): full += f"\n\n--- Page {i} ---\n\n{txt}"
parts = re.split(r"\n\s*([A-Z][A-Z\s]+:?|[0-9]+\.\s+[A-Z].*?)\s*\n", full)
if len(parts)>1:
return [{"title":parts[i].strip(),"content":parts[i+1].strip()} for i in range(1,len(parts),2)]
# fallback to single section
return [{"title":"Document","content":full}]
def extract_text_with_tika(pdf_path):
logger.info("Extracting via Tika...")
parsed = parser.from_file(pdf_path)
lines = parsed.get("content","").split("\n")
sections, current = [], {"title":"Introduction","content":""}
for ln in lines:
ln = ln.strip()
if not ln: continue
if len(ln)<80 and (ln.isupper() or ln.endswith(':') or re.match(r'^[0-9]+\.?\s+[A-Z]', ln)):
if current["content"]: sections.append(current)
current = {"title":ln, "content":""}
else:
current["content"] += ln + "\n\n"
if current["content"]: sections.append(current)
return sections
# --- Gemini API calls ---
def generate_greg_brockman_summary(content):
model = genai.GenerativeModel('gemini-1.5-pro')
prompt = f"""
You are an expert document analyst...
{content}
"""
try:
resp = model.generate_content(prompt)
return resp.text, None
except Exception as e:
logger.error(e)
return None, str(e)
def answer_question_about_pdf(content, question):
model = genai.GenerativeModel('gemini-1.5-pro')
prompt = f"""
You are a precise document analysis assistant...
DOCUMENT CONTENT:
{content}
QUESTION: {question}
"""
try:
resp = model.generate_content(prompt)
return resp.text, None
except Exception as e:
logger.error(e)
return None, str(e)
# --- Processing & Q&A ---
def process_pdf(pdf_file, progress=gr.Progress()):
global EXTRACTED_TEXT, PDF_SECTIONS, EXTRACTION_METHOD
if not API_KEY:
return None, None, "❌ Set GOOGLE_API_KEY in settings.", ""
if pdf_file is None:
return None, None, "❌ No file uploaded.", ""
tmp = tempfile.gettempdir()
path = os.path.join(tmp, pdf_file.name)
with open(path, 'wb') as f: f.write(pdf_file.read())
methods = []
if UNSTRUCTURED_AVAILABLE:
methods.append(("unstructured", extract_text_with_unstructured))
methods.extend([
("pypdf", extract_text_with_pypdf),
("tika", extract_text_with_tika)
])
with gr.Tab("Ask Questions"):
question = gr.Textbox(label="Question", lines=2)
ask_btn = gr.Button("Ask")
answer = gr.Textbox(label="Answer", lines=10)
ask_btn.click(ask_question, inputs=[question], outputs=[answer])
with gr.Tab("System Log"):
refresh = gr.Button("Refresh Log")
syslog = gr.Textbox(label="System Log", lines=15)
refresh.click(view_log, inputs=None, outputs=[syslog])
with gr.Row():
save_sum_btn = gr.Button("Save Summary")
save_sum_status = gr.Markdown("")
save_sum_btn.click(save_summary, inputs=[summary_out], outputs=[save_sum_status])
with gr.Row():
save_qa_btn = gr.Button("Save Q&A")
save_qa_status = gr.Markdown("")
save_qa_btn.click(save_qa, inputs=[question, answer], outputs=[save_qa_status])
if __name__ == "__main__":
# For Hugging Face Spaces, set `server_name="0.0.0.0"` if needed
app.launch()