Vision_tester / app.py
Daemontatox's picture
Update app.py
1ca242c verified
raw
history blame
10.3 kB
import os
import io
import time
import base64
import logging
import fitz # PyMuPDF
from PIL import Image
import gradio as gr
from openai import OpenAI # Use the OpenAI client that supports multimodal messages
# Load API key from environment variable (secrets)
HF_API_KEY = os.getenv("OPENAI_TOKEN")
if not HF_API_KEY:
raise ValueError("HF_API_KEY environment variable not set")
# Create the client pointing to the Hugging Face Inference endpoint
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=HF_API_KEY
)
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# -------------------------------
# Document State and File Processing
# -------------------------------
class DocumentState:
def __init__(self):
self.current_doc_images = []
self.current_doc_text = ""
self.doc_type = None
def clear(self):
self.current_doc_images = []
self.current_doc_text = ""
self.doc_type = None
doc_state = DocumentState()
def process_pdf_file(file_path):
"""Convert PDF pages to images and extract text using PyMuPDF."""
try:
doc = fitz.open(file_path)
images = []
text = ""
for page_num in range(doc.page_count):
try:
page = doc[page_num]
page_text = page.get_text("text")
if page_text.strip():
text += f"Page {page_num + 1}:\n{page_text}\n\n"
# Render page as an image with a zoom factor
zoom = 3
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data)).convert("RGB")
# Resize if image is too large
max_size = 1600
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
images.append(img)
except Exception as e:
logger.error(f"Error processing page {page_num}: {str(e)}")
continue
doc.close()
if not images:
raise ValueError("No valid images could be extracted from the PDF")
return images, text
except Exception as e:
logger.error(f"Error processing PDF file: {str(e)}")
raise
def process_uploaded_file(file):
"""Process an uploaded file (PDF or image) and update document state."""
try:
doc_state.clear()
if file is None:
return "No file uploaded. Please upload a file."
# Get the file path from the Gradio upload (may be a dict or file-like object)
if isinstance(file, dict):
file_path = file["name"]
else:
file_path = file.name
file_ext = file_path.lower().split('.')[-1]
image_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp'}
if file_ext == 'pdf':
doc_state.doc_type = 'pdf'
try:
doc_state.current_doc_images, doc_state.current_doc_text = process_pdf_file(file_path)
return f"PDF processed successfully. Total pages: {len(doc_state.current_doc_images)}. You can now chat with the bot."
except Exception as e:
return f"Error processing PDF: {str(e)}. Please try a different PDF file."
elif file_ext in image_extensions:
doc_state.doc_type = 'image'
try:
img = Image.open(file_path).convert("RGB")
max_size = 1600
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
doc_state.current_doc_images = [img]
return "Image loaded successfully. You can now chat with the bot."
except Exception as e:
return f"Error processing image: {str(e)}. Please try a different image file."
else:
return f"Unsupported file type: {file_ext}. Please upload a PDF or image file (PNG, JPG, JPEG, GIF, BMP, WEBP)."
except Exception as e:
logger.error(f"Error in process_uploaded_file: {str(e)}")
return "An error occurred while processing the file. Please try again."
def clear_context():
"""Clear the current document context and chat history."""
doc_state.clear()
return "Document context cleared. You can upload a new document.", []
# -------------------------------
# Predetermined Prompts
# -------------------------------
predetermined_prompts = {
"Software Tester": (
"Act as a software tester. Analyze the uploaded image of a software interface and generate comprehensive "
"test cases for its features. For each feature, provide test steps, expected results, and any necessary "
"preconditions. Be as detailed as possible."
)
}
# -------------------------------
# Chat Function with Streaming and Conversation History
# -------------------------------
def chat_respond(user_message, history, prompt_option):
"""
Append the user message (or, if starting a new conversation and no message is provided,
use the predetermined prompt) to the conversation history; build the API call using
the full conversation history (and the image if available); stream back the assistant response
while updating the history.
The history is a list of [user_text, assistant_text] pairs.
"""
# If this is the first message, add the predetermined prompt text.
if history == []:
# If user_message is empty, use the predetermined prompt.
if not user_message.strip():
user_message = predetermined_prompts.get(prompt_option, "Hello")
else:
# Optionally, prepend the predetermined prompt.
user_message = predetermined_prompts.get(prompt_option, "") + "\n" + user_message
# Append the new user message with an empty assistant response.
history = history + [[user_message, ""]]
# Build the messages list (for the multimodal API) from the conversation history.
messages = []
for i, (user_msg, assistant_msg) in enumerate(history):
# For the user message:
user_content = [{"type": "text", "text": user_msg}]
# For the very first user message, if an image was uploaded, append the image.
if i == 0 and doc_state.current_doc_images:
buffered = io.BytesIO()
doc_state.current_doc_images[0].save(buffered, format="PNG")
img_b64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
data_uri = f"data:image/png;base64,{img_b64}"
user_content.append({
"type": "image_url",
"image_url": {"url": data_uri}
})
messages.append({"role": "user", "content": user_content})
# For the assistant response, if available.
if assistant_msg:
messages.append({
"role": "assistant",
"content": [{"type": "text", "text": assistant_msg}]
})
# Call the inference API with streaming enabled.
try:
stream = client.chat.completions.create(
model="google/gemini-2.0-pro-exp-02-05:free",
messages=messages,
max_tokens=8192,
stream=True
)
except Exception as e:
logger.error(f"Error calling the API: {str(e)}")
history[-1][1] = "An error occurred while processing your request. Please try again."
yield history, history
# Stream and update the assistant's reply token by token.
buffer = ""
for chunk in stream:
delta = chunk.choices[0].delta.content
buffer += delta
# Update the assistant part of the latest message in the history.
history[-1][1] = buffer
# Yield the updated chat history (for the Chatbot component) and the state.
yield history, history
time.sleep(0.01)
return history, history
# -------------------------------
# Create the Gradio Interface
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("# Document Analyzer & Software Testing Chatbot")
gr.Markdown(
"Upload a PDF or an image (PNG, JPG, JPEG, GIF, BMP, WEBP). Then choose a prompt from the dropdown. "
"For example, select **Software Tester** to have the bot analyze an image of a software interface "
"and generate test cases. Chat with the bot in the conversation below."
)
with gr.Row():
file_upload = gr.File(
label="Upload Document",
file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"]
)
upload_status = gr.Textbox(label="Upload Status", interactive=False)
with gr.Row():
prompt_dropdown = gr.Dropdown(
label="Select Prompt",
choices=[
"Software Tester"
],
value="Software Tester"
)
clear_btn = gr.Button("Clear Document Context & Chat History")
chatbot = gr.Chatbot(label="Chat History", elem_id="chatbot")
with gr.Row():
user_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", show_label=False)
send_btn = gr.Button("Send")
# State to hold the conversation history
chat_state = gr.State([])
# When a file is uploaded, process it.
file_upload.change(fn=process_uploaded_file, inputs=file_upload, outputs=upload_status)
# Clear both the document context and chat history.
clear_btn.click(fn=clear_context, outputs=[upload_status, chat_state])
# When the user clicks Send, process the message and update the chat.
send_btn.click(fn=chat_respond,
inputs=[user_input, chat_state, prompt_dropdown],
outputs=[chatbot, chat_state])
demo.launch(debug=True)