Spaces:
Running
Running
from transformers import MllamaForConditionalGeneration, AutoProcessor, TextIteratorStreamer | |
from PIL import Image | |
import requests | |
import torch | |
from threading import Thread | |
import gradio as gr | |
from gradio import FileData | |
import time | |
import spaces | |
from pdf2image import convert_from_path | |
import os | |
from PyPDF2 import PdfReader | |
import tempfile | |
ckpt = "Daemontatox/DocumentCogito" | |
model = MllamaForConditionalGeneration.from_pretrained(ckpt, | |
torch_dtype=torch.bfloat16).to("cuda") | |
processor = AutoProcessor.from_pretrained(ckpt) | |
def process_pdf(pdf_path): | |
"""Convert PDF pages to images and extract text.""" | |
images = convert_from_path(pdf_path) | |
pdf_reader = PdfReader(pdf_path) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
return images, text | |
def is_pdf(file_path): | |
"""Check if the file is a PDF.""" | |
return file_path.lower().endswith('.pdf') | |
def bot_streaming(message, history, max_new_tokens=2048): | |
txt = message["text"] | |
ext_buffer = f"{txt}" | |
messages = [] | |
images = [] | |
# Process history | |
for i, msg in enumerate(history): | |
if isinstance(msg[0], tuple): | |
messages.append({"role": "user", "content": [{"type": "text", "text": history[i+1][0]}, {"type": "text", "text": history[i+1][1]}]}) | |
messages.append({"role": "assistant", "content": [{"type": "text", "text": history[i+1][1]}]}) | |
images.append(Image.open(msg[0][0]).convert("RGB")) | |
elif isinstance(history[i-1], tuple) and isinstance(msg[0], str): | |
pass | |
elif isinstance(history[i-1][0], str) and isinstance(msg[0], str): | |
messages.append({"role": "user", "content": [{"type": "text", "text": msg[0]}]}) | |
messages.append({"role": "assistant", "content": [{"type": "text", "text": msg[1]}]}) | |
# Process current message | |
if len(message["files"]) == 1: | |
file_path = message["files"][0]["path"] if isinstance(message["files"][0], dict) else message["files"][0] | |
if is_pdf(file_path): | |
# Handle PDF | |
pdf_images, pdf_text = process_pdf(file_path) | |
images.extend(pdf_images) | |
txt = f"{txt}\nExtracted text from PDF:\n{pdf_text}" | |
else: | |
# Handle regular image | |
image = Image.open(file_path).convert("RGB") | |
images.append(image) | |
messages.append({"role": "user", "content": [{"type": "text", "text": txt}, {"type": "image"}]}) | |
else: | |
messages.append({"role": "user", "content": [{"type": "text", "text": txt}]}) | |
texts = processor.apply_chat_template(messages, add_generation_prompt=True) | |
if not images: | |
inputs = processor(text=texts, return_tensors="pt").to("cuda") | |
else: | |
inputs = processor(text=texts, images=images, return_tensors="pt").to("cuda") | |
streamer = TextIteratorStreamer(processor, skip_special_tokens=True, skip_prompt=True) | |
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=max_new_tokens) | |
generated_text = "" | |
thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
thread.start() | |
buffer = "" | |
for new_text in streamer: | |
buffer += new_text | |
generated_text_without_prompt = buffer | |
time.sleep(0.01) | |
yield buffer | |
demo = gr.ChatInterface( | |
fn=bot_streaming, | |
title="Document Analyzer", | |
examples=[ | |
[{"text": "Which era does this piece belong to? Give details about the era.", "files":["./examples/rococo.jpg"]}, 200], | |
[{"text": "Where do the droughts happen according to this diagram?", "files":["./examples/weather_events.png"]}, 250], | |
[{"text": "What happens when you take out white cat from this chain?", "files":["./examples/ai2d_test.jpg"]}, 250], | |
[{"text": "How long does it take from invoice date to due date? Be short and concise.", "files":["./examples/invoice.png"]}, 250], | |
[{"text": "Where to find this monument? Can you give me other recommendations around the area?", "files":["./examples/wat_arun.jpg"]}, 250], | |
], | |
textbox=gr.MultimodalTextbox(), | |
additional_inputs=[ | |
gr.Slider( | |
minimum=10, | |
maximum=500, | |
value=2048, | |
step=10, | |
label="Maximum number of new tokens to generate", | |
) | |
], | |
cache_examples=False, | |
description="MllM Document and PDF Analyzer", | |
stop_btn="Stop Generation", | |
fill_height=True, | |
multimodal=True | |
) | |
# Update file types to include PDFs | |
demo.textbox.file_types = ["image", "pdf"] | |
demo.launch(debug=True) |