Spaces:
Running
Running
import os | |
import io | |
import time | |
import base64 | |
import logging | |
import fitz # PyMuPDF | |
from PIL import Image | |
import gradio as gr | |
from openai import OpenAI # Use the OpenAI client that supports multimodal messages | |
# Load API key from environment variable (secrets) | |
HF_API_KEY = os.getenv("OPENAI_TOKEN") | |
if not HF_API_KEY: | |
raise ValueError("HF_API_KEY environment variable not set") | |
# Create the client pointing to the Hugging Face Inference endpoint | |
client = OpenAI( | |
base_url="https://openrouter.ai/api/v1", | |
api_key=HF_API_KEY | |
) | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# ------------------------------- | |
# Document State and File Processing | |
# ------------------------------- | |
class DocumentState: | |
def __init__(self): | |
self.current_doc_images = [] | |
self.current_doc_text = "" | |
self.doc_type = None | |
def clear(self): | |
self.current_doc_images = [] | |
self.current_doc_text = "" | |
self.doc_type = None | |
doc_state = DocumentState() | |
def process_pdf_file(file_path): | |
"""Convert PDF pages to images and extract text using PyMuPDF.""" | |
try: | |
doc = fitz.open(file_path) | |
images = [] | |
text = "" | |
for page_num in range(doc.page_count): | |
try: | |
page = doc[page_num] | |
page_text = page.get_text("text") | |
if page_text.strip(): | |
text += f"Page {page_num + 1}:\n{page_text}\n\n" | |
# Render page as an image with a zoom factor | |
zoom = 3 | |
mat = fitz.Matrix(zoom, zoom) | |
pix = page.get_pixmap(matrix=mat, alpha=False) | |
img_data = pix.tobytes("png") | |
img = Image.open(io.BytesIO(img_data)).convert("RGB") | |
# Resize if image is too large | |
max_size = 1600 | |
if max(img.size) > max_size: | |
ratio = max_size / max(img.size) | |
new_size = tuple(int(dim * ratio) for dim in img.size) | |
img = img.resize(new_size, Image.Resampling.LANCZOS) | |
images.append(img) | |
except Exception as e: | |
logger.error(f"Error processing page {page_num}: {str(e)}") | |
continue | |
doc.close() | |
if not images: | |
raise ValueError("No valid images could be extracted from the PDF") | |
return images, text | |
except Exception as e: | |
logger.error(f"Error processing PDF file: {str(e)}") | |
raise | |
def process_uploaded_file(file): | |
"""Process an uploaded file (PDF or image) and update document state.""" | |
try: | |
doc_state.clear() | |
if file is None: | |
return "No file uploaded. Please upload a file." | |
# Get the file path from the Gradio upload (may be a dict or file-like object) | |
if isinstance(file, dict): | |
file_path = file["name"] | |
else: | |
file_path = file.name | |
file_ext = file_path.lower().split('.')[-1] | |
image_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp'} | |
if file_ext == 'pdf': | |
doc_state.doc_type = 'pdf' | |
try: | |
doc_state.current_doc_images, doc_state.current_doc_text = process_pdf_file(file_path) | |
return f"PDF processed successfully. Total pages: {len(doc_state.current_doc_images)}. You can now chat with the bot." | |
except Exception as e: | |
return f"Error processing PDF: {str(e)}. Please try a different PDF file." | |
elif file_ext in image_extensions: | |
doc_state.doc_type = 'image' | |
try: | |
img = Image.open(file_path).convert("RGB") | |
max_size = 1600 | |
if max(img.size) > max_size: | |
ratio = max_size / max(img.size) | |
new_size = tuple(int(dim * ratio) for dim in img.size) | |
img = img.resize(new_size, Image.Resampling.LANCZOS) | |
doc_state.current_doc_images = [img] | |
return "Image loaded successfully. You can now chat with the bot." | |
except Exception as e: | |
return f"Error processing image: {str(e)}. Please try a different image file." | |
else: | |
return f"Unsupported file type: {file_ext}. Please upload a PDF or image file (PNG, JPG, JPEG, GIF, BMP, WEBP)." | |
except Exception as e: | |
logger.error(f"Error in process_uploaded_file: {str(e)}") | |
return "An error occurred while processing the file. Please try again." | |
def clear_context(): | |
"""Clear the current document context and chat history.""" | |
doc_state.clear() | |
return "Document context cleared. You can upload a new document.", [] | |
# ------------------------------- | |
# Predetermined Prompts | |
# ------------------------------- | |
predetermined_prompts = { | |
"Software Tester": ( | |
"Act as a software tester. Analyze the uploaded image of a software interface and generate comprehensive " | |
"test cases for its features. For each feature, provide test steps, expected results, and any necessary " | |
"preconditions. Be as detailed as possible." | |
) | |
} | |
# ------------------------------- | |
# Chat Function with Streaming and Conversation History | |
# ------------------------------- | |
def chat_respond(user_message, history, prompt_option): | |
""" | |
Append the user message (or, if starting a new conversation and no message is provided, | |
use the predetermined prompt) to the conversation history; build the API call using | |
the full conversation history (and the image if available); stream back the assistant response | |
while updating the history. | |
The history is a list of [user_text, assistant_text] pairs. | |
""" | |
# If this is the first message, add the predetermined prompt text. | |
if history == []: | |
# If user_message is empty, use the predetermined prompt. | |
if not user_message.strip(): | |
user_message = predetermined_prompts.get(prompt_option, "Hello") | |
else: | |
# Optionally, prepend the predetermined prompt. | |
user_message = predetermined_prompts.get(prompt_option, "") + "\n" + user_message | |
# Append the new user message with an empty assistant response. | |
history = history + [[user_message, ""]] | |
# Build the messages list (for the multimodal API) from the conversation history. | |
messages = [] | |
for i, (user_msg, assistant_msg) in enumerate(history): | |
# For the user message: | |
user_content = [{"type": "text", "text": user_msg}] | |
# For the very first user message, if an image was uploaded, append the image. | |
if i == 0 and doc_state.current_doc_images: | |
buffered = io.BytesIO() | |
doc_state.current_doc_images[0].save(buffered, format="PNG") | |
img_b64 = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
data_uri = f"data:image/png;base64,{img_b64}" | |
user_content.append({ | |
"type": "image_url", | |
"image_url": {"url": data_uri} | |
}) | |
messages.append({"role": "user", "content": user_content}) | |
# For the assistant response, if available. | |
if assistant_msg: | |
messages.append({ | |
"role": "assistant", | |
"content": [{"type": "text", "text": assistant_msg}] | |
}) | |
# Call the inference API with streaming enabled. | |
try: | |
stream = client.chat.completions.create( | |
model="google/gemini-2.0-pro-exp-02-05:free", | |
messages=messages, | |
max_tokens=8192, | |
stream=True | |
) | |
except Exception as e: | |
logger.error(f"Error calling the API: {str(e)}") | |
history[-1][1] = "An error occurred while processing your request. Please try again." | |
yield history, history | |
# Stream and update the assistant's reply token by token. | |
buffer = "" | |
for chunk in stream: | |
delta = chunk.choices[0].delta.content | |
buffer += delta | |
# Update the assistant part of the latest message in the history. | |
history[-1][1] = buffer | |
# Yield the updated chat history (for the Chatbot component) and the state. | |
yield history, history | |
time.sleep(0.01) | |
return history, history | |
# ------------------------------- | |
# Create the Gradio Interface | |
# ------------------------------- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Document Analyzer & Software Testing Chatbot") | |
gr.Markdown( | |
"Upload a PDF or an image (PNG, JPG, JPEG, GIF, BMP, WEBP). Then choose a prompt from the dropdown. " | |
"For example, select **Software Tester** to have the bot analyze an image of a software interface " | |
"and generate test cases. Chat with the bot in the conversation below." | |
) | |
with gr.Row(): | |
file_upload = gr.File( | |
label="Upload Document", | |
file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"] | |
) | |
upload_status = gr.Textbox(label="Upload Status", interactive=False) | |
with gr.Row(): | |
prompt_dropdown = gr.Dropdown( | |
label="Select Prompt", | |
choices=[ | |
"Software Tester" | |
], | |
value="Software Tester" | |
) | |
clear_btn = gr.Button("Clear Document Context & Chat History") | |
chatbot = gr.Chatbot(label="Chat History", elem_id="chatbot") | |
with gr.Row(): | |
user_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", show_label=False) | |
send_btn = gr.Button("Send") | |
# State to hold the conversation history | |
chat_state = gr.State([]) | |
# When a file is uploaded, process it. | |
file_upload.change(fn=process_uploaded_file, inputs=file_upload, outputs=upload_status) | |
# Clear both the document context and chat history. | |
clear_btn.click(fn=clear_context, outputs=[upload_status, chat_state]) | |
# When the user clicks Send, process the message and update the chat. | |
send_btn.click(fn=chat_respond, | |
inputs=[user_input, chat_state, prompt_dropdown], | |
outputs=[chatbot, chat_state]) | |
demo.launch(debug=True) | |