|
import gradio as gr |
|
import os |
|
import json |
|
import base64 |
|
import tempfile |
|
from pathlib import Path |
|
|
|
EXTRACTORS = ['pdf_plumber', 'py_pdf', 'docling', 'extractous', 'pypdfium2', 'pymupdf', 'pymupdf_llm'] |
|
|
|
def add_page_breaks(text, page_offsets): |
|
"""Add page break markers to text based on page_offsets.""" |
|
if not page_offsets: |
|
return text |
|
|
|
result = [] |
|
last_offset = 0 |
|
for offset in page_offsets: |
|
result.append(text[last_offset:offset]) |
|
result.append("\n<---page-break--->\n") |
|
last_offset = offset |
|
|
|
|
|
if last_offset < len(text): |
|
result.append(text[last_offset:]) |
|
|
|
return "".join(result) |
|
|
|
class ExtractorComparer: |
|
def __init__(self): |
|
self.json_files = [] |
|
self.current_index = 0 |
|
self.current_data = None |
|
self.temp_pdf_path = None |
|
self.current_pdf_bytes = None |
|
|
|
def load_files(self, directory_path): |
|
"""Load all JSON files from the specified directory.""" |
|
self.json_files = [] |
|
try: |
|
for filename in os.listdir(directory_path): |
|
if filename.endswith('.json') or filename.endswith('.jsonl'): |
|
self.json_files.append(os.path.join(directory_path, filename)) |
|
|
|
if self.json_files: |
|
self.current_index = 0 |
|
file_progress, annotation_status = self.get_progress_info() |
|
return file_progress, annotation_status |
|
else: |
|
return "No JSON files found", "No files loaded" |
|
except Exception as e: |
|
return f"Error loading files: {str(e)}", "Error" |
|
|
|
def load_current_file(self): |
|
"""Load the current JSON file data.""" |
|
if not self.json_files: |
|
return None, "N/A", "N/A" |
|
|
|
try: |
|
with open(self.json_files[self.current_index], 'r') as f: |
|
self.current_data = json.load(f) |
|
|
|
|
|
pdf_bytes = None |
|
debug_info = "" |
|
if 'pdf_plumber' in self.current_data: |
|
plumber_data = self.current_data['pdf_plumber'] |
|
if 'media' in plumber_data and plumber_data['media'] and isinstance(plumber_data['media'], list) and len(plumber_data['media']) > 0: |
|
media_item = plumber_data['media'][0] |
|
if 'media_bytes' in media_item and media_item['media_bytes']: |
|
try: |
|
pdf_bytes = base64.b64decode(media_item['media_bytes']) |
|
self.current_pdf_bytes = pdf_bytes |
|
except Exception as e: |
|
debug_info = f"Error decoding media_bytes: {str(e)}" |
|
|
|
|
|
if pdf_bytes: |
|
if self.temp_pdf_path: |
|
try: |
|
os.remove(self.temp_pdf_path) |
|
except: |
|
pass |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: |
|
temp_file.write(pdf_bytes) |
|
self.temp_pdf_path = temp_file.name |
|
|
|
|
|
base64_pdf = base64.b64encode(pdf_bytes).decode('utf-8') |
|
|
|
|
|
file_progress, annotation_status = self.get_progress_info() |
|
|
|
return base64_pdf, file_progress, annotation_status |
|
else: |
|
file_progress, annotation_status = self.get_progress_info() |
|
return None, file_progress, annotation_status |
|
except Exception as e: |
|
return None, "Error loading file", "No annotation" |
|
|
|
def get_progress_info(self): |
|
"""Generate progress information and annotation status.""" |
|
if not self.json_files: |
|
return "No files loaded", "No annotation" |
|
|
|
current_file = self.json_files[self.current_index] |
|
filename = Path(current_file).name |
|
|
|
|
|
file_progress = f"File {self.current_index + 1} of {len(self.json_files)}: {filename}" |
|
|
|
|
|
best_extractor_file = os.path.splitext(current_file)[0] + "_best.txt" |
|
annotation_status = "Not annotated" |
|
|
|
if os.path.exists(best_extractor_file): |
|
try: |
|
with open(best_extractor_file, 'r') as f: |
|
best_extractor = f.read().strip() |
|
annotation_status = f"Best extractor: {best_extractor}" |
|
except: |
|
pass |
|
|
|
|
|
annotated_count = 0 |
|
for json_file in self.json_files: |
|
best_file = os.path.splitext(json_file)[0] + "_best.txt" |
|
if os.path.exists(best_file): |
|
annotated_count += 1 |
|
|
|
file_progress = f"{file_progress} (Annotated: {annotated_count}/{len(self.json_files)})" |
|
|
|
return file_progress, annotation_status |
|
|
|
def get_extractor_text(self, extractor_name): |
|
"""Get text with page breaks for the specified extractor.""" |
|
if not self.current_data or extractor_name not in self.current_data: |
|
return "" |
|
|
|
extractor_data = self.current_data[extractor_name] |
|
if 'text' not in extractor_data: |
|
return f"No text found for {extractor_name}" |
|
|
|
text = extractor_data.get('text', '') |
|
|
|
|
|
page_offsets = [] |
|
if 'media' in extractor_data and extractor_data['media'] and len(extractor_data['media']) > 0: |
|
media_item = extractor_data['media'][0] |
|
if 'metadata' in media_item and 'pdf_metadata' in media_item['metadata'] and 'page_offsets' in media_item['metadata']['pdf_metadata']: |
|
page_offsets = media_item['metadata']['pdf_metadata']['page_offsets'] |
|
|
|
return add_page_breaks(text, page_offsets) |
|
|
|
def next_pdf(self): |
|
"""Load the next PDF in the list.""" |
|
if not self.json_files: |
|
return None, "N/A", "N/A" |
|
|
|
self.current_index = (self.current_index + 1) % len(self.json_files) |
|
return self.load_current_file() |
|
|
|
def prev_pdf(self): |
|
"""Load the previous PDF in the list.""" |
|
if not self.json_files: |
|
return None, "N/A", "N/A" |
|
|
|
self.current_index = (self.current_index - 1) % len(self.json_files) |
|
return self.load_current_file() |
|
|
|
def set_best_extractor(self, extractor_name): |
|
"""Record that this extractor is the best for the current file.""" |
|
if not self.json_files or not self.current_data: |
|
return "N/A", "N/A" |
|
|
|
try: |
|
|
|
result_file = os.path.splitext(self.json_files[self.current_index])[0] + "_best.txt" |
|
with open(result_file, 'w') as f: |
|
f.write(extractor_name) |
|
|
|
|
|
file_progress, annotation_status = self.get_progress_info() |
|
|
|
return file_progress, annotation_status |
|
except Exception as e: |
|
return "Error saving annotation", "No annotation" |
|
|
|
def create_interface(): |
|
comparer = ExtractorComparer() |
|
|
|
|
|
custom_css = """ |
|
.extraction-text textarea { |
|
font-family: Arial, Helvetica, sans-serif !important; |
|
font-size: 14px !important; |
|
line-height: 1.5 !important; |
|
} |
|
""" |
|
|
|
with gr.Blocks(title="PDF Extractor Comparer", theme="soft", css=custom_css) as demo: |
|
gr.Markdown("## PDF Extractor Comparer") |
|
|
|
with gr.Row(): |
|
directory_input = gr.Textbox( |
|
label="Path to JSON Directory", |
|
placeholder="e.g., /path/to/your/json/files" |
|
) |
|
load_button = gr.Button("Load PDFs", variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=3): |
|
|
|
pdf_viewer_html = gr.HTML( |
|
label="PDF Document", |
|
value=''' |
|
<div style="width:100%; height:700px; position:relative; border:1px solid #ddd;"> |
|
<style> |
|
@font-face { |
|
font-family: 'Local Arial'; |
|
src: local('Arial'); |
|
} |
|
body { |
|
font-family: 'Local Arial', sans-serif; |
|
} |
|
</style> |
|
<meta http-equiv="Content-Security-Policy" content="default-src * blob:; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline';"> |
|
<iframe id="pdf-iframe" width="100%" height="100%" style="border:none;" src="about:blank" sandbox="allow-same-origin allow-scripts allow-forms"></iframe> |
|
<div id="pdf-fallback" style="position:absolute; top:0; left:0; width:100%; height:100%; |
|
display:flex; align-items:center; justify-content:center; padding:20px; text-align:center;"> |
|
Click "Load PDFs" to start viewing documents. |
|
</div> |
|
</div> |
|
''' |
|
) |
|
|
|
pdf_data_hidden = gr.Textbox(visible=False, elem_id="pdf_base64_data") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
file_progress_output = gr.Textbox(label="File Progress", interactive=False) |
|
annotation_status_output = gr.Textbox(label="Annotation Status", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
prev_button = gr.Button("⬅️ Previous", elem_id="prev_button") |
|
next_button = gr.Button("Next ➡️", elem_id="next_button") |
|
|
|
|
|
gr.Markdown("### Select Best Extractor") |
|
extractor_buttons = [] |
|
for extractor in EXTRACTORS: |
|
button = gr.Button(extractor, variant="secondary") |
|
extractor_buttons.append(button) |
|
button.click( |
|
comparer.set_best_extractor, |
|
inputs=[gr.Textbox(value=extractor, visible=False)], |
|
outputs=[file_progress_output, annotation_status_output] |
|
) |
|
|
|
|
|
gr.Markdown("### Extractor Comparison") |
|
|
|
|
|
with gr.Row(): |
|
extractor1_dropdown = gr.Dropdown( |
|
choices=EXTRACTORS, |
|
label="Extractor 1", |
|
value=EXTRACTORS[0] if EXTRACTORS else None |
|
) |
|
extractor2_dropdown = gr.Dropdown( |
|
choices=EXTRACTORS, |
|
label="Extractor 2", |
|
value=EXTRACTORS[1] if len(EXTRACTORS) > 1 else EXTRACTORS[0] if EXTRACTORS else None |
|
) |
|
|
|
|
|
with gr.Row(): |
|
extractor1_text = gr.Textbox( |
|
label="Extractor 1 Output", |
|
lines=15, |
|
elem_classes=["extraction-text"] |
|
) |
|
extractor2_text = gr.Textbox( |
|
label="Extractor 2 Output", |
|
lines=15, |
|
elem_classes=["extraction-text"] |
|
) |
|
|
|
|
|
load_button.click( |
|
comparer.load_files, |
|
inputs=[directory_input], |
|
outputs=[file_progress_output, annotation_status_output] |
|
).then( |
|
comparer.load_current_file, |
|
outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] |
|
).then( |
|
comparer.get_extractor_text, |
|
inputs=[extractor1_dropdown], |
|
outputs=[extractor1_text] |
|
).then( |
|
comparer.get_extractor_text, |
|
inputs=[extractor2_dropdown], |
|
outputs=[extractor2_text] |
|
) |
|
|
|
prev_button.click( |
|
comparer.prev_pdf, |
|
outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] |
|
).then( |
|
comparer.get_extractor_text, |
|
inputs=[extractor1_dropdown], |
|
outputs=[extractor1_text] |
|
).then( |
|
comparer.get_extractor_text, |
|
inputs=[extractor2_dropdown], |
|
outputs=[extractor2_text] |
|
) |
|
|
|
next_button.click( |
|
comparer.next_pdf, |
|
outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] |
|
).then( |
|
comparer.get_extractor_text, |
|
inputs=[extractor1_dropdown], |
|
outputs=[extractor1_text] |
|
).then( |
|
comparer.get_extractor_text, |
|
inputs=[extractor2_dropdown], |
|
outputs=[extractor2_text] |
|
) |
|
|
|
extractor1_dropdown.change( |
|
comparer.get_extractor_text, |
|
inputs=[extractor1_dropdown], |
|
outputs=[extractor1_text] |
|
) |
|
|
|
extractor2_dropdown.change( |
|
comparer.get_extractor_text, |
|
inputs=[extractor2_dropdown], |
|
outputs=[extractor2_text] |
|
) |
|
|
|
|
|
demo.load( |
|
fn=None, |
|
js=""" |
|
// Function to safely setup the MutationObserver for the PDF data |
|
function setupPdfDataObserver() { |
|
console.log('Setting up PDF data observer...'); |
|
|
|
// Wait for Gradio components to fully render |
|
setTimeout(() => { |
|
try { |
|
const targetNode = document.getElementById('pdf_base64_data'); |
|
if (!targetNode) { |
|
console.error('PDF data container not found!'); |
|
return; |
|
} |
|
|
|
// Find the textarea within the Gradio component |
|
const hiddenTextArea = targetNode.querySelector('textarea'); |
|
if (!hiddenTextArea) { |
|
console.error('Hidden textarea not found within the container!'); |
|
return; |
|
} |
|
|
|
console.log('Found hidden textarea to observe'); |
|
|
|
// Setup observer configuration |
|
const observerConfig = { |
|
characterData: true, |
|
childList: true, |
|
subtree: true, |
|
attributes: true |
|
}; |
|
|
|
// Create and attach the observer |
|
const observer = new MutationObserver(function(mutationsList) { |
|
console.log('Mutation detected, checking textarea value'); |
|
if (hiddenTextArea.value && hiddenTextArea.value.length > 100) { |
|
console.log('Valid value found in textarea, displaying PDF'); |
|
displayPdfBlob(hiddenTextArea.value); |
|
} |
|
}); |
|
|
|
// Observe the textarea itself, not its parent |
|
observer.observe(hiddenTextArea, observerConfig); |
|
console.log('MutationObserver attached to textarea'); |
|
|
|
// Also check initial value |
|
if (hiddenTextArea.value && hiddenTextArea.value.length > 100) { |
|
console.log('Initial valid value found, displaying PDF'); |
|
displayPdfBlob(hiddenTextArea.value); |
|
} |
|
} catch (error) { |
|
console.error('Error setting up observer:', error); |
|
} |
|
}, 1000); // Wait 1 second for components to render |
|
} |
|
|
|
// Function to display PDF from base64 data |
|
function displayPdfBlob(base64Data) { |
|
try { |
|
// Get iframe and fallback elements |
|
const iframe = document.getElementById('pdf-iframe'); |
|
const fallback = document.getElementById('pdf-fallback'); |
|
|
|
if (!iframe || !fallback) { |
|
console.error('PDF viewer elements not found'); |
|
return; |
|
} |
|
|
|
// Convert base64 to binary |
|
const binaryString = atob(base64Data); |
|
const len = binaryString.length; |
|
const bytes = new Uint8Array(len); |
|
|
|
for (let i = 0; i < len; i++) { |
|
bytes[i] = binaryString.charCodeAt(i); |
|
} |
|
|
|
// Create blob and URL |
|
const blob = new Blob([bytes], { type: 'application/pdf' }); |
|
const objectUrl = URL.createObjectURL(blob); |
|
|
|
// Update iframe |
|
iframe.src = objectUrl; |
|
|
|
// Hide fallback message |
|
fallback.style.display = 'none'; |
|
|
|
// Log success |
|
console.log('PDF displayed successfully'); |
|
} catch (error) { |
|
console.error('Error displaying PDF:', error); |
|
} |
|
} |
|
|
|
// Initialize the observer after everything is loaded |
|
window.addEventListener('load', function() { |
|
console.log('Window loaded, initializing PDF observer...'); |
|
setupPdfDataObserver(); |
|
}); |
|
|
|
// Also setup when Gradio mounts the component |
|
document.addEventListener('DOMContentLoaded', function() { |
|
console.log('DOM loaded, waiting for Gradio components...'); |
|
// Wait a bit longer for Gradio components to mount |
|
setTimeout(setupPdfDataObserver, 2000); |
|
}); |
|
""" |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |