documentaitest / app.py
IAMTFRMZA's picture
Update app.py
90e2f9d verified
raw
history blame
6.12 kB
import streamlit as st
import os
import time
import re
import uuid
import threading
import sounddevice as sd
import numpy as np
from openai import OpenAI
from realtime_transcriber import WebSocketClient, connections, WEBSOCKET_URI, WEBSOCKET_HEADERS
# ------------------ App Configuration ------------------
st.set_page_config(page_title="Document AI Assistant", layout="wide")
st.title("πŸ“„ Document AI Assistant")
st.caption("Chat with an AI Assistant on your medical/pathology documents and voice")
# ------------------ Load API Key and Assistant ID ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
ASSISTANT_ID = os.environ.get("ASSISTANT_ID")
if not OPENAI_API_KEY or not ASSISTANT_ID:
st.error("Missing secrets. Please ensure both OPENAI_API_KEY and ASSISTANT_ID are set in your Hugging Face Space secrets.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Session State Initialization ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_url" not in st.session_state:
st.session_state.image_url = None
if "image_updated" not in st.session_state:
st.session_state.image_updated = False
# ------------------ Sidebar ------------------
st.sidebar.header("πŸ”§ Settings")
if st.sidebar.button("πŸ”„ Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_url = None
st.session_state.image_updated = False
st.rerun()
show_image = st.sidebar.checkbox("πŸ“– Show Document Image", value=True)
# ------------------ Section 1: Layout (Image + Chat) ------------------
col1, col2 = st.columns([1, 2])
# ----- Left Panel: Document Image -----
with col1:
if show_image and st.session_state.image_url:
st.image(st.session_state.image_url, caption="πŸ“‘ Extracted Page", use_container_width=True)
st.session_state.image_updated = False
# ----- Right Panel: Chat Assistant -----
with col2:
prompt = st.chat_input("Type your question about the document...")
# Pair user + assistant messages
paired_messages = []
buffer = []
for msg in st.session_state.messages:
buffer.append(msg)
if msg["role"] == "assistant" and len(buffer) == 2:
paired_messages.append(buffer.copy())
buffer.clear()
if buffer:
paired_messages.append(buffer.copy())
# Render messages from newest to oldest
with st.container():
for pair in reversed(paired_messages):
for msg in pair:
with st.chat_message(msg["role"]):
st.write(msg["content"])
# Handle new prompt
if prompt:
st.session_state.messages.append({"role": "user", "content": prompt})
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=prompt
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("Assistant is thinking..."):
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=st.session_state.thread_id,
run_id=run.id
)
if run_status.status == "completed":
break
time.sleep(1)
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
assistant_message = None
for message in reversed(messages.data):
if message.role == "assistant":
assistant_message = message.content[0].text.value
break
st.session_state.messages.append({"role": "assistant", "content": assistant_message})
image_match = re.search(
r'https://raw\.githubusercontent\.com/AndrewLORTech/surgical-pathology-manual/main/[\w\-/]*\.png',
assistant_message
)
if image_match:
st.session_state.image_url = image_match.group(0)
st.session_state.image_updated = True
st.rerun()
except Exception as e:
st.error(f"❌ Error: {str(e)}")
# ------------------ Section 3: Voice Transcription ------------------
st.markdown("---")
st.markdown("## πŸŽ™οΈ Real-Time Voice Transcription")
# Init client
if "voice_client_id" not in st.session_state:
client_id = str(uuid.uuid4())
st.session_state.voice_client_id = client_id
connections[client_id] = WebSocketClient(WEBSOCKET_URI, WEBSOCKET_HEADERS, client_id)
threading.Thread(target=connections[client_id].run, daemon=True).start()
client_id = st.session_state.voice_client_id
transcript_placeholder = st.empty()
recording = st.checkbox("🎀 Start Recording")
if recording:
st.warning("Recording is active. Speak clearly...")
def audio_stream_callback(indata, frames, time_info, status):
if status:
print(f"⚠️ Audio status: {status}")
if client_id in connections:
connections[client_id].enqueue_audio_chunk(16000, indata.copy())
transcript_placeholder.markdown(f"**Live Transcript:**\n\n{connections[client_id].transcript}")
stream = sd.InputStream(callback=audio_stream_callback, channels=1, samplerate=16000)
stream.start()
st.session_state["stream"] = stream
else:
if "stream" in st.session_state:
st.session_state["stream"].stop()
del st.session_state["stream"]
st.success("Recording stopped.")
# Final transcript
if client_id in connections:
st.markdown("**Final Transcript Output:**")
st.markdown(connections[client_id].transcript)