documentaitest / app.py
IAMTFRMZA's picture
Update app.py
13919c8 verified
raw
history blame
6.94 kB
import streamlit as st
from streamlit_webrtc import webrtc_streamer, AudioProcessorBase
import av
import numpy as np
import tempfile
import soundfile as sf
import os
import time
import re
from openai import OpenAI
# ------------------ Audio Processor ------------------
class AudioRecorder(AudioProcessorBase):
def __init__(self):
self.recorded_frames = []
def recv(self, frame: av.AudioFrame) -> av.AudioFrame:
self.recorded_frames.append(frame)
return frame
# ------------------ App Configuration ------------------
st.set_page_config(page_title="Document AI Assistant", layout="wide")
st.title("πŸ“„ Document AI Assistant")
st.caption("Chat with an AI Assistant on your medical/pathology documents")
# ------------------ Load API Key and Assistant ID from Hugging Face Secrets ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
ASSISTANT_ID = os.environ.get("ASSISTANT_ID")
# ------------------ Error Handling for Missing Secrets ------------------
if not OPENAI_API_KEY or not ASSISTANT_ID:
st.error("❌ Missing secrets. Please ensure both OPENAI_API_KEY and ASSISTANT_ID are set in your Hugging Face Space secrets.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Session State Initialization ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_url" not in st.session_state:
st.session_state.image_url = None
if "image_updated" not in st.session_state:
st.session_state.image_updated = False
if "transcript_preview" not in st.session_state:
st.session_state.transcript_preview = None
# ------------------ Sidebar Controls ------------------
st.sidebar.header("πŸ”§ Settings")
if st.sidebar.button("πŸ”„ Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_url = None
st.session_state.image_updated = False
st.session_state.transcript_preview = None
st.rerun()
show_image = st.sidebar.checkbox("πŸ“– Show Document Image", value=True)
preview_toggle = st.sidebar.checkbox("πŸ“ Preview transcription before sending", value=True)
# ------------------ Split Layout ------------------
col1, col2 = st.columns([1, 2])
# ------------------ Image Panel (Left) ------------------
with col1:
if show_image and st.session_state.image_url:
st.image(st.session_state.image_url, caption="πŸ“‘ Extracted Page", use_container_width=True)
st.session_state.image_updated = False
# ------------------ Voice Input Processing ------------------
with col2:
st.markdown("### πŸŽ™οΈ Voice Input (Optional)")
webrtc_ctx = webrtc_streamer(
key="voice-input",
mode="SENDONLY",
audio_processor_factory=AudioRecorder,
media_stream_constraints={"audio": True, "video": False},
async_processing=True,
)
if webrtc_ctx.audio_processor and not webrtc_ctx.state.playing and webrtc_ctx.audio_processor.recorded_frames:
st.info("Transcribing your voice...")
wav_path = tempfile.mktemp(suffix=".wav")
with open(wav_path, "wb") as f:
frames = webrtc_ctx.audio_processor.recorded_frames
audio = frames[0].to_ndarray()
for frame in frames[1:]:
audio = np.concatenate((audio, frame.to_ndarray()), axis=1)
sf.write(f, audio.T, samplerate=frames[0].sample_rate, format="WAV")
audio_file = open(wav_path, "rb")
try:
whisper_result = client.audio.transcriptions.create(model="whisper-1", file=audio_file, response_format="json")
transcript = whisper_result.text.strip()
confidence = whisper_result.get("confidence", "N/A")
if transcript:
st.success(f"Recognized: {transcript}")
st.caption(f"🧠 Confidence: {confidence}")
if preview_toggle:
st.session_state.transcript_preview = transcript
else:
st.session_state.messages.append({"role": "user", "content": transcript})
st.rerun()
except Exception as e:
st.error(f"❌ Transcription failed: {str(e)}")
if st.session_state.transcript_preview:
st.markdown("---")
st.markdown("### πŸ“ Transcription Preview")
st.markdown(f"> {st.session_state.transcript_preview}")
if st.button("βœ… Send to Assistant"):
st.session_state.messages.append({"role": "user", "content": st.session_state.transcript_preview})
st.session_state.transcript_preview = None
st.rerun()
if st.button("❌ Discard"):
st.session_state.transcript_preview = None
st.rerun()
# ------------------ Chat Panel (Right) ------------------
with col2:
if prompt := st.chat_input("Type your question about the document..."):
st.session_state.messages.append({"role": "user", "content": prompt})
st.chat_message("user").write(prompt)
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
thread_id = st.session_state.thread_id
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=prompt)
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=ASSISTANT_ID)
with st.spinner("πŸ€– Assistant is thinking..."):
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages = client.beta.threads.messages.list(thread_id=thread_id)
assistant_message = None
for message in reversed(messages.data):
if message.role == "assistant":
assistant_message = message.content[0].text.value
break
st.chat_message("assistant").write(assistant_message)
st.session_state.messages.append({"role": "assistant", "content": assistant_message})
image_match = re.search(
r'https://raw\\.githubusercontent\\.com/AndrewLORTech/surgical-pathology-manual/main/[\\w\\-/]*\\.png',
assistant_message
)
if image_match:
st.session_state.image_url = image_match.group(0)
st.session_state.image_updated = True
st.rerun()
except Exception as e:
st.error(f"❌ Error: {str(e)}")
for message in reversed(st.session_state.messages):
role, content = message["role"], message["content"]
st.chat_message(role).write(content)