Spaces:
Running
Running
import streamlit as st | |
import os | |
import time | |
import re | |
import requests | |
import tempfile | |
from openai import OpenAI | |
from streamlit_webrtc import webrtc_streamer, WebRtcMode | |
import av | |
import numpy as np | |
import wave | |
# ------------------ Configuration ------------------ | |
st.set_page_config(page_title="Document AI Assistant", layout="wide") | |
st.title("π Document AI Assistant") | |
st.caption("Chat with an AI Assistant on your medical/pathology documents") | |
# ------------------ Secrets ------------------ | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
ASSISTANT_ID = os.environ.get("ASSISTANT_ID") | |
if not OPENAI_API_KEY or not ASSISTANT_ID: | |
st.error("β Missing secrets. Please set both OPENAI_API_KEY and ASSISTANT_ID in your Hugging Face Space settings.") | |
st.stop() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# ------------------ Session State ------------------ | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "thread_id" not in st.session_state: | |
st.session_state.thread_id = None | |
if "image_url" not in st.session_state: | |
st.session_state.image_url = None | |
if "audio_buffer" not in st.session_state: | |
st.session_state.audio_buffer = [] | |
# ------------------ Whisper Transcription ------------------ | |
def transcribe_audio(file_path, api_key): | |
with open(file_path, "rb") as f: | |
response = requests.post( | |
"https://api.openai.com/v1/audio/transcriptions", | |
headers={"Authorization": f"Bearer {api_key}"}, | |
files={"file": f}, | |
data={"model": "whisper-1"} | |
) | |
return response.json().get("text", None) | |
# ------------------ Audio Recorder ------------------ | |
class AudioProcessor: | |
def __init__(self): | |
self.frames = [] | |
def recv(self, frame): | |
audio = frame.to_ndarray() | |
self.frames.append(audio) | |
return av.AudioFrame.from_ndarray(audio, layout="mono") | |
def save_wav(frames, path, rate=48000): | |
audio_data = np.concatenate(frames) | |
with wave.open(path, 'wb') as wf: | |
wf.setnchannels(1) | |
wf.setsampwidth(2) | |
wf.setframerate(rate) | |
wf.writeframes(audio_data.tobytes()) | |
# ------------------ Sidebar & Image Panel ------------------ | |
st.sidebar.header("π§ Settings") | |
if st.sidebar.button("π Clear Chat"): | |
st.session_state.messages = [] | |
st.session_state.thread_id = None | |
st.session_state.image_url = None | |
st.rerun() | |
show_image = st.sidebar.checkbox("π Show Document Image", value=True) | |
col1, col2 = st.columns([1, 2]) | |
with col1: | |
if show_image and st.session_state.image_url: | |
st.image(st.session_state.image_url, caption="π Extracted Page", use_container_width=True) | |
# ------------------ Chat & Voice Panel ------------------ | |
with col2: | |
for message in st.session_state.messages: | |
st.chat_message(message["role"]).write(message["content"]) | |
# π€ Real-time voice recorder | |
st.subheader("ποΈ Ask with your voice") | |
audio_ctx = webrtc_streamer( | |
key="speech", | |
mode=WebRtcMode.SENDONLY, | |
in_audio_enabled=True, | |
audio_receiver_size=256 | |
) | |
if audio_ctx.audio_receiver: | |
audio_processor = AudioProcessor() | |
result = audio_ctx.audio_receiver.recv() | |
audio_data = result.to_ndarray() | |
st.session_state.audio_buffer.append(audio_data) | |
# β±οΈ Auto stop after ~3 seconds | |
if len(st.session_state.audio_buffer) > 30: | |
tmp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name | |
save_wav(st.session_state.audio_buffer, tmp_path) | |
st.session_state.audio_buffer = [] | |
with st.spinner("π§ Transcribing..."): | |
transcript = transcribe_audio(tmp_path, OPENAI_API_KEY) | |
if transcript: | |
st.success("π " + transcript) | |
st.session_state.messages.append({"role": "user", "content": transcript}) | |
st.chat_message("user").write(transcript) | |
prompt = transcript | |
try: | |
if st.session_state.thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.thread_id = thread.id | |
thread_id = st.session_state.thread_id | |
client.beta.threads.messages.create( | |
thread_id=thread_id, | |
role="user", | |
content=prompt | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("Assistant is thinking..."): | |
while True: | |
run_status = client.beta.threads.runs.retrieve( | |
thread_id=thread_id, | |
run_id=run.id | |
) | |
if run_status.status == "completed": | |
break | |
time.sleep(1) | |
messages = client.beta.threads.messages.list(thread_id=thread_id) | |
assistant_message = None | |
for message in reversed(messages.data): | |
if message.role == "assistant": | |
assistant_message = message.content[0].text.value | |
break | |
st.chat_message("assistant").write(assistant_message) | |
st.session_state.messages.append({"role": "assistant", "content": assistant_message}) | |
image_match = re.search( | |
r'https://raw\.githubusercontent\.com/AndrewLORTech/surgical-pathology-manual/main/[\w\-/]*\.png', | |
assistant_message | |
) | |
if image_match: | |
st.session_state.image_url = image_match.group(0) | |
except Exception as e: | |
st.error(f"β Error: {str(e)}") | |
# Fallback text input | |
if prompt := st.chat_input("π¬ Or type your question..."): | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
st.chat_message("user").write(prompt) | |
# You can add assistant logic here if you want it to run immediately | |