Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit_webrtc import webrtc_streamer, AudioProcessorBase | |
import av | |
import numpy as np | |
import tempfile | |
import soundfile as sf | |
import os | |
import time | |
import re | |
from openai import OpenAI | |
# ------------------ Audio Processor ------------------ | |
class AudioRecorder(AudioProcessorBase): | |
def __init__(self): | |
self.recorded_frames = [] | |
def recv(self, frame: av.AudioFrame) -> av.AudioFrame: | |
self.recorded_frames.append(frame) | |
return frame | |
# ------------------ App Configuration ------------------ | |
st.set_page_config(page_title="Document AI Assistant", layout="wide") | |
st.title("π Document AI Assistant") | |
st.caption("Chat with an AI Assistant on your medical/pathology documents") | |
# ------------------ Load API Key and Assistant ID from Hugging Face Secrets ------------------ | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
ASSISTANT_ID = os.environ.get("ASSISTANT_ID") | |
# ------------------ Error Handling for Missing Secrets ------------------ | |
if not OPENAI_API_KEY or not ASSISTANT_ID: | |
st.error("β Missing secrets. Please ensure both OPENAI_API_KEY and ASSISTANT_ID are set in your Hugging Face Space secrets.") | |
st.stop() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# ------------------ Session State Initialization ------------------ | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "thread_id" not in st.session_state: | |
st.session_state.thread_id = None | |
if "image_url" not in st.session_state: | |
st.session_state.image_url = None | |
if "image_updated" not in st.session_state: | |
st.session_state.image_updated = False | |
if "transcript_preview" not in st.session_state: | |
st.session_state.transcript_preview = None | |
# ------------------ Sidebar Controls ------------------ | |
st.sidebar.header("π§ Settings") | |
if st.sidebar.button("π Clear Chat"): | |
st.session_state.messages = [] | |
st.session_state.thread_id = None | |
st.session_state.image_url = None | |
st.session_state.image_updated = False | |
st.session_state.transcript_preview = None | |
st.rerun() | |
show_image = st.sidebar.checkbox("π Show Document Image", value=True) | |
preview_toggle = st.sidebar.checkbox("π Preview transcription before sending", value=True) | |
# ------------------ Split Layout ------------------ | |
col1, col2 = st.columns([1, 2]) | |
# ------------------ Image Panel (Left) ------------------ | |
with col1: | |
if show_image and st.session_state.image_url: | |
st.image(st.session_state.image_url, caption="π Extracted Page", use_container_width=True) | |
st.session_state.image_updated = False | |
# ------------------ Voice Input Processing ------------------ | |
with col2: | |
st.markdown("### ποΈ Voice Input (Optional)") | |
webrtc_ctx = webrtc_streamer( | |
key="voice-input", | |
mode="SENDONLY", | |
audio_processor_factory=AudioRecorder, | |
media_stream_constraints={"audio": True, "video": False}, | |
async_processing=True, | |
) | |
if webrtc_ctx.audio_processor and not webrtc_ctx.state.playing and webrtc_ctx.audio_processor.recorded_frames: | |
st.info("Transcribing your voice...") | |
wav_path = tempfile.mktemp(suffix=".wav") | |
with open(wav_path, "wb") as f: | |
frames = webrtc_ctx.audio_processor.recorded_frames | |
audio = frames[0].to_ndarray() | |
for frame in frames[1:]: | |
audio = np.concatenate((audio, frame.to_ndarray()), axis=1) | |
sf.write(f, audio.T, samplerate=frames[0].sample_rate, format="WAV") | |
audio_file = open(wav_path, "rb") | |
try: | |
whisper_result = client.audio.transcriptions.create(model="whisper-1", file=audio_file, response_format="json") | |
transcript = whisper_result.text.strip() | |
confidence = whisper_result.get("confidence", "N/A") | |
if transcript: | |
st.success(f"Recognized: {transcript}") | |
st.caption(f"π§ Confidence: {confidence}") | |
if preview_toggle: | |
st.session_state.transcript_preview = transcript | |
else: | |
st.session_state.messages.append({"role": "user", "content": transcript}) | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Transcription failed: {str(e)}") | |
if st.session_state.transcript_preview: | |
st.markdown("---") | |
st.markdown("### π Transcription Preview") | |
st.markdown(f"> {st.session_state.transcript_preview}") | |
if st.button("β Send to Assistant"): | |
st.session_state.messages.append({"role": "user", "content": st.session_state.transcript_preview}) | |
st.session_state.transcript_preview = None | |
st.rerun() | |
if st.button("β Discard"): | |
st.session_state.transcript_preview = None | |
st.rerun() | |
# ------------------ Chat Panel (Right) ------------------ | |
with col2: | |
if prompt := st.chat_input("Type your question about the document..."): | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
st.chat_message("user").write(prompt) | |
try: | |
if st.session_state.thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.thread_id = thread.id | |
thread_id = st.session_state.thread_id | |
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=prompt) | |
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=ASSISTANT_ID) | |
with st.spinner("π€ Assistant is thinking..."): | |
while True: | |
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) | |
if run_status.status == "completed": | |
break | |
time.sleep(1) | |
messages = client.beta.threads.messages.list(thread_id=thread_id) | |
assistant_message = None | |
for message in reversed(messages.data): | |
if message.role == "assistant": | |
assistant_message = message.content[0].text.value | |
break | |
st.chat_message("assistant").write(assistant_message) | |
st.session_state.messages.append({"role": "assistant", "content": assistant_message}) | |
image_match = re.search( | |
r'https://raw\\.githubusercontent\\.com/AndrewLORTech/surgical-pathology-manual/main/[\\w\\-/]*\\.png', | |
assistant_message | |
) | |
if image_match: | |
st.session_state.image_url = image_match.group(0) | |
st.session_state.image_updated = True | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Error: {str(e)}") | |
for message in reversed(st.session_state.messages): | |
role, content = message["role"], message["content"] | |
st.chat_message(role).write(content) |