muhammadsalmanalfaridzi's picture
Update app.py
a8177b1 verified
import io
import os
import requests
import streamlit as st
import concurrent.futures
from PyPDF2 import PdfReader, PdfWriter
import cv2
import numpy as np
from PIL import Image
from dotenv import load_dotenv
import json
import openai
import urllib3
from fpdf import FPDF
import tempfile
import base64
from functools import lru_cache
import time
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Load environment variables
load_dotenv()
api_key = os.getenv("VISION_AGENT_API_KEY")
model_api_key = os.getenv("MODEL_API_KEY")
model_url = os.getenv("MODEL_URL")
model_base = os.getenv("MODEL_BASE")
os.makedirs("static", exist_ok=True)
# Import Agentic Document Extraction library
from agentic_doc.parse import parse_documents
#############################
# Convert a PDF to images (one per page) and return dimensions (in PDF points)
#############################
def pdf_to_images(pdf_file):
images = []
page_dims = [] # (pdf_width, pdf_height) per page
try:
import fitz # PyMuPDF
pdf_document = fitz.open(stream=pdf_file.read(), filetype="pdf")
for page in pdf_document:
rect = page.rect
page_dims.append((rect.width, rect.height))
pix = page.get_pixmap(dpi=200)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
images.append(np.array(img))
pdf_document.close()
except Exception as e:
st.error(f"Error converting PDF to images: {e}")
return images, page_dims
#############################
# Convert an annotated image to a PDF file and return its path
#############################
def image_to_pdf(image):
temp_img = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
Image.fromarray(image).save(temp_img.name)
temp_img.close()
pdf = FPDF(unit="mm", format="A4")
pdf.add_page()
pdf.image(temp_img.name, x=0, y=0, w=210) # A4 width is 210 mm; adjust if needed.
temp_pdf = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)
pdf.output(temp_pdf.name)
temp_pdf.close()
return temp_pdf.name
#############################
# Display a PDF in an iframe
#############################
def display_pdf(pdf_path):
with open(pdf_path, "rb") as f:
base64_pdf = base64.b64encode(f.read()).decode('utf-8')
pdf_display = f'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="600px"></iframe>'
st.markdown(pdf_display, unsafe_allow_html=True)
#############################
# Optimized chunk matching for speed
#############################
@lru_cache(maxsize=128)
def calculate_scale_factors(img_width, img_height, pdf_width, pdf_height):
# Hitung skala yang mempertahankan aspect ratio. Pengurangan 0.7 adalah penyesuaian manual.
scale_x = img_width / pdf_width - 0.7
scale_y = img_height / pdf_height - 0.7
return scale_x, scale_y
def process_chunks_parallel(chunks_list, img, scale_factors, offset_x, offset_y, invert_y):
"""Process chunks in parallel using numpy vectorization"""
img_height, img_width = img.shape[:2]
scale_x, scale_y = scale_factors
# Hitung total boxes dari semua chunks
total_boxes = sum(len(chunk.get("bboxes", [])) for chunk in chunks_list)
boxes = np.zeros((total_boxes, 4), dtype=np.int32)
box_idx = 0
# Proses bounding box secara iteratif
for chunk in chunks_list:
bboxes = chunk.get("bboxes", [])
for bbox in bboxes:
if len(bbox) == 4:
# Konversi koordinat PDF ke koordinat image
x1 = int(bbox[0] * scale_x)
x2 = int(bbox[2] * scale_x)
if invert_y:
y1 = int(img_height - (bbox[3] * scale_y))
y2 = int(img_height - (bbox[1] * scale_y))
else:
y1 = int(bbox[1] * scale_y)
y2 = int(bbox[3] * scale_y)
# Terapkan offset dan pastikan koordinat tidak melebihi batas image
x1 = max(0, min(x1 + offset_x, img_width - 1))
x2 = max(0, min(x2 + offset_x, img_width - 1))
y1 = max(0, min(y1 + offset_y, img_height - 1))
y2 = max(0, min(y2 + offset_y, img_height - 1))
boxes[box_idx] = [x1, y1, x2, y2]
box_idx += 1
# Gambar semua kotak secara batch
for box in boxes[:box_idx]:
cv2.rectangle(img, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)
return img
#############################
# Process a PDF using agentic_doc
#############################
def process_pdf_agentic(pdf_file):
# Simpan PDF ke file temporer
pdf_bytes = pdf_file.read()
pdf_file.seek(0)
ext = ".pdf"
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp_file:
tmp_file.write(pdf_bytes)
tmp_file_path = tmp_file.name
try:
# Parse dokumen menggunakan agentic_doc
results = parse_documents([tmp_file_path])
if not results:
st.error(f"Gagal parse dokumen {pdf_file.name}, hasil parse kosong.")
return None, None, None
parsed_doc = results[0]
st.write(f"Total chunks: {len(parsed_doc.chunks)}")
except Exception as e:
st.error(f"Error processing {pdf_file.name}: {e}")
return None, None, None
# Bangun evidence dictionary dari parsed_doc.chunks
evidence = {}
for chunk in parsed_doc.chunks:
# Pastikan setiap chunk memiliki atribut 'grounding' dengan info page dan bbox.
if not hasattr(chunk, 'grounding') or not chunk.grounding:
continue
for ground in chunk.grounding:
page = ground.page # Ambil atribut 'page' secara langsung
# Ambil bounding box jika ada
bbox = []
if ground.box is not None:
bbox = [ground.box.l, ground.box.t, ground.box.r, ground.box.b]
# Hanya tambahkan evidence jika bbox valid
if not bbox:
continue
key = f"{pdf_file.name}:{page}"
chunk_evidence = {
"file": pdf_file.name,
"page": page,
"bboxes": [bbox],
"captions": [chunk.text],
"reason": "Extracted using agentic_doc"
}
if key in evidence:
evidence[key].append(chunk_evidence)
else:
evidence[key] = [chunk_evidence]
# Dapatkan gambar halaman dan dimensinya untuk anotasi
pdf_file.seek(0)
page_images, page_dims = pdf_to_images(pdf_file)
total_pages = len(page_images) if page_images else 0
return evidence, page_images, page_dims
#############################
# Helper to get answer and best chunks from DeepSeek-V3
#############################
def get_answer_and_best_chunks(user_query, evidence):
prompt = f"""
Use the following JSON evidence extracted from the uploaded PDF files, answer the following question based on that evidence.
Please return your response in JSON format with three keys:
1. "answer": Your detailed answer to the question
2. "reasoning": Your step-by-step reasoning process explaining how you arrived at the answer
3. "best_chunks": A list of objects that support your answer, where each object must include:
- "file": The filename where the evidence was found
- "page": The page number (1-indexed) where the evidence was found
- "bboxes": A list of bounding boxes, where each box is [x, y, w, h]
- "captions": A list of captions or text snippets corresponding to each bbox
- "reason": A detailed explanation of why these specific chunks support your answer and how they connect to your reasoning
Note: Most of the times, an answer spans multiple pages, multiple files, and have many bboxes and captions associated with it, verify if the overall answer and reasoning is derived from the best_chunks selected from the evidence without missing any chunk and don't skip returning all associated chunks with all relevant bboxes and captions.
Question: {user_query}
Evidence: {evidence}
"""
try:
client = openai.OpenAI(
api_key=model_api_key,
base_url=model_url,
)
chat_response = client.chat.completions.create(
model=model_base,
messages=[
{"role": "system", "content": "You are a helpful expert that analyses the context deeply and reasons through it without assuming anything to provide a detailed and accurate answer."},
{"role": "user", "content": prompt},
],
temperature=0.1,
top_p=0.1
)
raw = chat_response.choices[0].message.content.strip()
# Hapus markdown code fences jika ada
if raw.startswith("```"):
lines = raw.splitlines()
if lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].startswith("```"):
lines = lines[:-1]
raw = "\n".join(lines).strip()
parsed = json.loads(raw)
return parsed
except Exception as e:
st.error(f"Error getting answer: {e}")
return {
"answer": "Sorry, I could not retrieve an answer.",
"reasoning": "An error occurred during processing.",
"best_chunks": []
}
#############################
# Main Application
#############################
st.title("Multi-PDF Research Paper QA Assistant")
st.markdown("""
Upload one or more PDFs. The app will extract structured evidence (text chunks with bounding boxes) from each PDF using the Agentic Document Extraction tool.
You can then ask a question, and the system will answer using a DeepSeek-V3 model – displaying only the pages (as PDFs) where supporting evidence was found.
""")
# Izinkan upload PDF lebih dari satu
uploaded_pdfs = st.file_uploader("Upload PDF files", type=["pdf"], accept_multiple_files=True)
# Tambahkan preview PDF dalam collapsible sections
if uploaded_pdfs:
st.markdown("### PDF Previews")
st.info("You can preview each PDF while the system processes them. Click to expand/collapse.")
for pdf_file in uploaded_pdfs:
with st.expander(f"Preview: {pdf_file.name}"):
pdf_bytes = pdf_file.read()
base64_pdf = base64.b64encode(pdf_bytes).decode('utf-8')
pdf_display = f'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="600px"></iframe>'
st.markdown(pdf_display, unsafe_allow_html=True)
pdf_file.seek(0)
if uploaded_pdfs:
# Cek apakah PDF yang diupload sudah diproses
current_pdfs = {pdf.name: pdf for pdf in uploaded_pdfs}
if ("processed_pdfs" not in st.session_state or
current_pdfs.keys() != st.session_state.processed_pdfs.keys()):
with st.spinner("Processing new PDFs..."):
all_evidence = {}
all_images = {}
all_page_dims = {}
all_total_pages = {}
# Proses tiap PDF menggunakan agentic_doc
for pdf_file in uploaded_pdfs:
filename = pdf_file.name
st.write(f"Processing {filename}...")
evidence, page_images, page_dims = process_pdf_agentic(pdf_file)
if evidence is None:
continue
all_evidence.update(evidence)
all_images[filename] = page_images
all_page_dims[filename] = page_dims
all_total_pages[filename] = len(page_images)
# Simpan data ke session state
st.session_state.all_evidence = all_evidence
st.session_state.all_images = all_images
st.session_state.all_page_dims = all_page_dims
st.session_state.all_total_pages = all_total_pages
st.session_state.processed_pdfs = current_pdfs
st.success("PDF processing complete!")
else:
# Gunakan data yang sudah diproses dari session state
all_evidence = st.session_state.all_evidence
all_images = st.session_state.all_images
all_page_dims = st.session_state.all_page_dims
all_total_pages = st.session_state.all_total_pages
st.markdown("### Extraction Complete")
# --- Chat Interface ---
st.markdown("## Ask a Question")
user_query = st.chat_input("Enter your question about the PDFs:")
if user_query:
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
st.session_state.chat_history.append({"role": "user", "content": user_query})
start_time = time.time()
with st.spinner("Analyzing your question..."):
# Gabungkan evidence dari semua file yang memiliki evidence.
filtered_evidence = {k: v for k, v in all_evidence.items() if v}
combined_evidence = json.dumps(filtered_evidence, indent=2)
# Dapatkan jawaban dan best chunks dari model
result_json = get_answer_and_best_chunks(user_query, combined_evidence)
answer = result_json.get("answer", "No answer provided.")
reasoning = result_json.get("reasoning", "No reasoning provided.")
best_chunks = result_json.get("best_chunks", [])
st.session_state.chat_history.append({"role": "assistant", "content": answer})
tab1, tab2 = st.tabs(["Current Q&A", "Chat History"])
with tab1:
st.chat_message("user").write(user_query)
st.chat_message("assistant").write(answer)
if best_chunks:
# Group best chunks by page
matched = {}
for chunk in best_chunks:
key = f"{chunk.get('file')}:{chunk.get('page')}"
matched.setdefault(key, []).append(chunk)
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_key = {}
for comp_key, chunks_list in matched.items():
try:
filename = comp_key.split(':')[0]
page_num = int(comp_key.split(':')[1])
page_idx = page_num - 1
if filename in all_images and page_idx < len(all_images[filename]):
st.write(f"Processing {filename} - Page {page_num} (index: {page_idx})")
img = all_images[filename][page_idx].copy()
img_height, img_width = img.shape[:2]
pdf_width, pdf_height = all_page_dims[filename][page_idx]
scale_factors = calculate_scale_factors(
img_width, img_height,
pdf_width, pdf_height
)
future = executor.submit(
process_chunks_parallel,
chunks_list,
img,
scale_factors,
0,
0,
False
)
future_to_key[future] = (comp_key, filename, page_num)
else:
st.warning(f"Page index {page_idx+1} tidak valid untuk file {filename}.")
except Exception as e:
st.error(f"Error processing {comp_key}: {e}")
continue
for future in concurrent.futures.as_completed(future_to_key):
comp_key, filename, page_num = future_to_key[future]
try:
annotated_img = future.result()
annotated_pdf_path = image_to_pdf(annotated_img)
st.markdown(f"**Matched Page {page_num} from {filename}**")
display_pdf(annotated_pdf_path)
except Exception as e:
st.warning(f"Failed to process {comp_key}: {e}")
st.markdown("### Answer Analysis and Supporting Evidence")
st.markdown("**Why these chunks were selected:**")
st.write(reasoning)
st.markdown("\n**Supporting Evidence:**")
for chunk in best_chunks:
st.markdown(f"πŸ“„ **{chunk.get('file')} - Page {chunk.get('page')}**")
captions = chunk.get('captions', [])
if captions:
st.markdown("**Text:**")
for caption in captions:
st.markdown(f"- {caption}")
st.markdown(f"**Why this supports the answer:** {chunk.get('reason')}")
st.markdown("---")
total_time = round(time.time() - start_time, 2)
st.success(f"Thought process complete! (took {total_time} seconds)")
else:
total_time = round(time.time() - start_time, 2)
st.error(f"No supporting evidence found (took {total_time} seconds)")
st.info("No supporting chunks identified.")
with tab2:
st.markdown("### Complete Chat History")
for chat in st.session_state.chat_history:
if chat["role"] == "user":
st.chat_message("user").write(chat["content"])
else:
st.chat_message("assistant").write(chat["content"])
else:
st.info("Please upload one or more PDF files.")