Spaces:
Running
Running
from transformers import MllamaForConditionalGeneration, AutoProcessor, TextIteratorStreamer | |
from PIL import Image | |
import requests | |
import torch | |
from threading import Thread | |
import gradio as gr | |
from gradio import FileData | |
import time | |
import spaces | |
import fitz # PyMuPDF | |
import io | |
import numpy as np | |
# Load model and processor | |
ckpt = "Daemontatox/DocumentCogito" | |
model = MllamaForConditionalGeneration.from_pretrained(ckpt, torch_dtype=torch.bfloat16).to("cuda") | |
processor = AutoProcessor.from_pretrained(ckpt) | |
# Document state to track uploaded files | |
class DocumentState: | |
def __init__(self): | |
self.current_doc_images = [] | |
self.current_doc_text = "" | |
self.doc_type = None | |
def clear(self): | |
self.current_doc_images = [] | |
self.current_doc_text = "" | |
self.doc_type = None | |
doc_state = DocumentState() | |
# Function to convert PDF to images and extract text | |
def process_pdf_file(file_path): | |
"""Convert PDF to images and extract text using PyMuPDF.""" | |
doc = fitz.open(file_path) | |
images = [] | |
text = "" | |
# Process each page | |
for page_num in range(doc.page_count): | |
page = doc[page_num] | |
text += f"Page {page_num + 1} content:\n{page.get_text()}\n" | |
pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) | |
img_data = pix.tobytes("png") | |
img = Image.open(io.BytesIO(img_data)) | |
images.append(img.convert("RGB")) | |
doc.close() | |
return images, text | |
# Function to process uploaded files (PDF or image) | |
def process_file(file): | |
"""Process either PDF or image file and update document state.""" | |
doc_state.clear() | |
if isinstance(file, dict): | |
file_path = file["path"] | |
else: | |
file_path = file | |
if file_path.lower().endswith('.pdf'): | |
doc_state.doc_type = 'pdf' | |
doc_state.current_doc_images, doc_state.current_doc_text = process_pdf_file(file_path) | |
return f"PDF processed. Total pages: {len(doc_state.current_doc_images)}. You can now ask questions about the content." | |
else: | |
doc_state.doc_type = 'image' | |
doc_state.current_doc_images = [Image.open(file_path).convert("RGB")] | |
return "Image loaded successfully. You can now ask questions about the content." | |
# Function to handle streaming responses from the model | |
def bot_streaming(message, history, max_new_tokens=8192): | |
txt = message["text"] | |
messages = [] | |
# Process new file if provided | |
if message.get("files") and len(message["files"]) > 0: | |
process_file(message["files"][0]) | |
# Process history | |
for i, msg in enumerate(history): | |
if isinstance(msg[0], dict): # Multimodal message (text + files) | |
user_content = [{"type": "text", "text": msg[0]["text"]}] | |
if "files" in msg[0] and len(msg[0]["files"]) > 0: | |
user_content.append({"type": "image"}) | |
messages.append({"role": "user", "content": user_content}) | |
messages.append({"role": "assistant", "content": [{"type": "text", "text": msg[1]}]}) | |
elif isinstance(msg[0], str): # Text-only message | |
messages.append({"role": "user", "content": [{"type": "text", "text": msg[0]}]}) | |
messages.append({"role": "assistant", "content": [{"type": "text", "text": msg[1]}]}) | |
# Include document context in the current message | |
if doc_state.current_doc_images: | |
context = f"\nDocument context:\n{doc_state.current_doc_text}" if doc_state.current_doc_text else "" | |
current_msg = f"{txt}{context}" | |
messages.append({"role": "user", "content": [{"type": "text", "text": current_msg}, {"type": "image"}]}) | |
else: | |
messages.append({"role": "user", "content": [{"type": "text", "text": txt}]}) | |
# Apply chat template to messages | |
texts = processor.apply_chat_template(messages, add_generation_prompt=True) | |
# Process inputs based on whether we have images | |
if doc_state.current_doc_images: | |
inputs = processor( | |
text=texts, | |
images=doc_state.current_doc_images[0:1], # Only use first image | |
return_tensors="pt" | |
).to("cuda") | |
else: | |
inputs = processor(text=texts, return_tensors="pt").to("cuda") | |
streamer = TextIteratorStreamer(processor, skip_special_tokens=True, skip_prompt=True) | |
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=max_new_tokens) | |
thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
thread.start() | |
buffer = "" | |
for new_text in streamer: | |
buffer += new_text | |
time.sleep(0.01) | |
yield buffer | |
# Function to clear document context | |
def clear_context(): | |
"""Clear the current document context.""" | |
doc_state.clear() | |
return "Document context cleared. You can upload a new document." | |
# Create the Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# Document Analyzer with Chat Support") | |
gr.Markdown("Upload a PDF or image and chat about its contents. For PDFs, all pages will be processed for visual analysis.") | |
chatbot = gr.ChatInterface( | |
fn=bot_streaming, | |
title="Document Chat", | |
examples=[ | |
[{"text": "Which era does this piece belong to? Give details about the era.", "files":["./examples/rococo.jpg"]}, 200], | |
[{"text": "Where do the droughts happen according to this diagram?", "files":["./examples/weather_events.png"]}, 250], | |
[{"text": "What happens when you take out white cat from this chain?", "files":["./examples/ai2d_test.jpg"]}, 250], | |
[{"text": "How long does it take from invoice date to due date? Be short and concise.", "files":["./examples/invoice.png"]}, 250], | |
[{"text": "Where to find this monument? Can you give me other recommendations around the area?", "files":["./examples/wat_arun.jpg"]}, 250], | |
], | |
textbox=gr.MultimodalTextbox(), | |
additional_inputs=[ | |
gr.Slider( | |
minimum=10, | |
maximum=2048, | |
value=8192, | |
step=10, | |
label="Maximum number of new tokens to generate", | |
) | |
], | |
cache_examples=False, | |
stop_btn="Stop Generation", | |
fill_height=True, | |
multimodal=True | |
) | |
clear_btn = gr.Button("Clear Document Context") | |
clear_btn.click(fn=clear_context) | |
# Update accepted file types | |
chatbot.textbox.file_types = ["image", "pdf","text"] | |
# Launch the interface | |
demo.launch(debug=True) |