Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import time | |
import re | |
import uuid | |
import threading | |
import sounddevice as sd | |
import numpy as np | |
from openai import OpenAI | |
from realtime_transcriber import WebSocketClient, connections, WEBSOCKET_URI, WEBSOCKET_HEADERS | |
# ------------------ App Configuration ------------------ | |
st.set_page_config(page_title="Document AI Assistant", layout="wide") | |
st.title("π Document AI Assistant") | |
st.caption("Chat with an AI Assistant on your medical/pathology documents and voice") | |
# ------------------ Load API Key and Assistant ID ------------------ | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
ASSISTANT_ID = os.environ.get("ASSISTANT_ID") | |
if not OPENAI_API_KEY or not ASSISTANT_ID: | |
st.error("Missing secrets. Please ensure both OPENAI_API_KEY and ASSISTANT_ID are set in your Hugging Face Space secrets.") | |
st.stop() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# ------------------ Session State Initialization ------------------ | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "thread_id" not in st.session_state: | |
st.session_state.thread_id = None | |
if "image_url" not in st.session_state: | |
st.session_state.image_url = None | |
if "image_updated" not in st.session_state: | |
st.session_state.image_updated = False | |
# ------------------ Sidebar ------------------ | |
st.sidebar.header("π§ Settings") | |
if st.sidebar.button("π Clear Chat"): | |
st.session_state.messages = [] | |
st.session_state.thread_id = None | |
st.session_state.image_url = None | |
st.session_state.image_updated = False | |
st.rerun() | |
show_image = st.sidebar.checkbox("π Show Document Image", value=True) | |
# ------------------ Section 1: Layout (Image + Chat) ------------------ | |
col1, col2 = st.columns([1, 2]) | |
# ----- Left Panel: Document Image ----- | |
with col1: | |
if show_image and st.session_state.image_url: | |
st.image(st.session_state.image_url, caption="π Extracted Page", use_container_width=True) | |
st.session_state.image_updated = False | |
# ----- Right Panel: Chat Assistant ----- | |
with col2: | |
prompt = st.chat_input("Type your question about the document...") | |
# Pair user + assistant messages | |
paired_messages = [] | |
buffer = [] | |
for msg in st.session_state.messages: | |
buffer.append(msg) | |
if msg["role"] == "assistant" and len(buffer) == 2: | |
paired_messages.append(buffer.copy()) | |
buffer.clear() | |
if buffer: | |
paired_messages.append(buffer.copy()) | |
# Render messages from newest to oldest | |
with st.container(): | |
for pair in reversed(paired_messages): | |
for msg in pair: | |
with st.chat_message(msg["role"]): | |
st.write(msg["content"]) | |
# Handle new prompt | |
if prompt: | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
try: | |
if st.session_state.thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.thread_id = thread.id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.thread_id, | |
role="user", | |
content=prompt | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("Assistant is thinking..."): | |
while True: | |
run_status = client.beta.threads.runs.retrieve( | |
thread_id=st.session_state.thread_id, | |
run_id=run.id | |
) | |
if run_status.status == "completed": | |
break | |
time.sleep(1) | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id) | |
assistant_message = None | |
for message in reversed(messages.data): | |
if message.role == "assistant": | |
assistant_message = message.content[0].text.value | |
break | |
st.session_state.messages.append({"role": "assistant", "content": assistant_message}) | |
image_match = re.search( | |
r'https://raw\.githubusercontent\.com/AndrewLORTech/surgical-pathology-manual/main/[\w\-/]*\.png', | |
assistant_message | |
) | |
if image_match: | |
st.session_state.image_url = image_match.group(0) | |
st.session_state.image_updated = True | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Error: {str(e)}") | |
# ------------------ Section 3: Voice Transcription ------------------ | |
st.markdown("---") | |
st.markdown("## ποΈ Real-Time Voice Transcription") | |
# Init client | |
if "voice_client_id" not in st.session_state: | |
client_id = str(uuid.uuid4()) | |
st.session_state.voice_client_id = client_id | |
connections[client_id] = WebSocketClient(WEBSOCKET_URI, WEBSOCKET_HEADERS, client_id) | |
threading.Thread(target=connections[client_id].run, daemon=True).start() | |
client_id = st.session_state.voice_client_id | |
transcript_placeholder = st.empty() | |
recording = st.checkbox("π€ Start Recording") | |
if recording: | |
st.warning("Recording is active. Speak clearly...") | |
def audio_stream_callback(indata, frames, time_info, status): | |
if status: | |
print(f"β οΈ Audio status: {status}") | |
if client_id in connections: | |
connections[client_id].enqueue_audio_chunk(16000, indata.copy()) | |
transcript_placeholder.markdown(f"**Live Transcript:**\n\n{connections[client_id].transcript}") | |
stream = sd.InputStream(callback=audio_stream_callback, channels=1, samplerate=16000) | |
stream.start() | |
st.session_state["stream"] = stream | |
else: | |
if "stream" in st.session_state: | |
st.session_state["stream"].stop() | |
del st.session_state["stream"] | |
st.success("Recording stopped.") | |
# Final transcript | |
if client_id in connections: | |
st.markdown("**Final Transcript Output:**") | |
st.markdown(connections[client_id].transcript) | |