noumanjavaid commited on
Commit
7834426
Β·
verified Β·
1 Parent(s): 1f99600

Update src/streamlit_app.py

Browse files
Files changed (1) hide show
  1. src/streamlit_app.py +306 -398
src/streamlit_app.py CHANGED
@@ -6,42 +6,47 @@ import base64
6
  import io
7
  import threading
8
  import traceback
9
- import time # For potential delays or timestamps if needed in future
10
- import atexit # For cleanup actions on exit
11
-
12
  from dotenv import load_dotenv
13
 
14
- # --- Import main libraries ---
15
- import cv2
16
- import pyaudio
17
  import PIL.Image
18
- import mss # For screen capture
19
 
20
  from google import genai
21
  from google.genai import types
22
 
 
 
 
 
 
 
 
 
 
 
23
  # --- Configuration ---
24
- load_dotenv() # Load environment variables from .env file
 
25
 
26
  # Audio configuration
27
- FORMAT = pyaudio.paInt16
28
- CHANNELS = 1
29
- SEND_SAMPLE_RATE = 16000
30
- RECEIVE_SAMPLE_RATE = 24000 # Gemini documentation recommendation
31
- CHUNK_SIZE = 1024
32
- AUDIO_PLAYBACK_QUEUE_MAXSIZE = 50 # Buffer for audio chunks from Gemini for playback
33
- MEDIA_OUT_QUEUE_MAXSIZE = 20 # Buffer for audio/video frames to send to Gemini
 
34
 
35
  # Video configuration
36
- VIDEO_FPS_LIMIT = 1 # Send 1 frame per second to the API
37
- VIDEO_PREVIEW_RESIZE = (640, 480) # Size for Streamlit preview display
38
- VIDEO_API_RESIZE = (1024, 1024) # Max size for images sent to API (aspect ratio preserved)
39
 
40
- # Gemini model configuration
41
- MODEL_NAME = "models/gemini-2.0-flash-live-001" # VERIFY THIS MODEL NAME IS CORRECT FOR LIVE API
42
- DEFAULT_VIDEO_MODE = "camera" # Default video input mode ("camera", "screen", "none")
43
 
44
- # System Prompt for the Medical Assistant
45
  MEDICAL_ASSISTANT_SYSTEM_PROMPT = """You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
46
 
47
  Your responsibilities are:
@@ -58,467 +63,370 @@ Your responsibilities are:
58
  Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
59
  """
60
 
61
- # Initialize PyAudio instance globally
62
  pya = pyaudio.PyAudio()
63
-
64
- # Ensure PyAudio is terminated on exit
65
  def cleanup_pyaudio():
66
- print("Terminating PyAudio instance.")
67
- pya.terminate()
 
68
  atexit.register(cleanup_pyaudio)
69
 
 
 
 
 
70
 
71
- # Initialize Streamlit session state variables
72
- def init_session_state():
73
- defaults = {
74
- 'app_initialized': True, # To ensure this runs only once per session
75
- 'session_active': False,
76
- 'audio_loop_instance': None,
77
- 'chat_messages': [],
78
- 'current_frame_preview': None,
79
- 'video_mode_selection': DEFAULT_VIDEO_MODE,
80
- }
81
- for key, value in defaults.items():
82
- if key not in st.session_state:
83
- st.session_state[key] = value
84
-
85
- init_session_state()
86
-
87
-
88
- # Configure Streamlit page
89
- st.set_page_config(page_title="Live Medical Assistant", layout="wide")
90
-
91
- # Initialize Gemini client
92
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
93
- if not GEMINI_API_KEY:
94
- st.error("GEMINI_API_KEY not found. Please set it in your environment variables or a .env file.")
 
 
 
 
 
 
 
 
 
 
95
  st.stop()
96
 
97
- # Use 'client' consistently for the genai.Client instance
98
- client = None # Define client in the global scope
99
- try:
100
- client = genai.Client(
101
- http_options={"api_version": "v1beta"}, # Required for live
102
- api_key=GEMINI_API_KEY,
103
- )
104
- except Exception as e:
105
- st.error(f"Failed to initialize Gemini client: {e}")
106
- st.stop()
107
-
108
- # Gemini LiveConnectConfig
109
  LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
110
- response_modalities=["audio", "text"], # Expect both audio and text responses
111
  speech_config=types.SpeechConfig(
112
- voice_config=types.VoiceConfig(
113
- prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr") # Changed voice to Zephyr
114
- )
115
  ),
116
  )
117
 
118
- class AudioVisualLoop:
119
- def __init__(self, video_mode_setting=DEFAULT_VIDEO_MODE):
120
- self.video_mode = video_mode_setting
121
  self.gemini_session = None
122
- self.async_event_loop = None # To store the loop for thread-safe calls
123
-
124
- self.audio_playback_queue = None # asyncio.Queue for audio from Gemini
125
- self.media_to_gemini_queue = None # asyncio.Queue for audio/video to Gemini
126
-
127
- self.is_running = True # Flag to control all async tasks
128
- self.mic_stream = None # PyAudio input stream
129
- self.playback_stream = None # PyAudio output stream
130
- self.camera_capture = None # OpenCV VideoCapture object
131
 
132
  async def send_text_input_to_gemini(self, user_text):
133
  if not user_text or not self.gemini_session or not self.is_running:
134
- print("Warning: Cannot send text. Session not active or no text.")
135
  return
136
  try:
137
- print(f"Sending text to Gemini: {user_text}")
138
  await self.gemini_session.send(input=user_text, end_of_turn=True)
139
  except Exception as e:
140
- st.error(f"Error sending text message to Gemini: {e}")
141
- print(f"Traceback for send_text_input_to_gemini: {traceback.format_exc()}")
142
-
143
- def _process_camera_frame(self):
144
- if not self.camera_capture or not self.camera_capture.isOpened():
145
- print("Camera not available or not open.")
146
- return None
147
- ret, frame = self.camera_capture.read()
148
- if not ret:
149
- print("Failed to read frame from camera.")
150
- return None
151
-
152
- frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
153
- img = PIL.Image.fromarray(frame_rgb)
154
-
155
- preview_img = img.copy()
156
- preview_img.thumbnail(VIDEO_PREVIEW_RESIZE)
157
-
158
- api_img = img.copy()
159
- api_img.thumbnail(VIDEO_API_RESIZE) # Preserves aspect ratio
160
-
161
- image_io = io.BytesIO()
162
- api_img.save(image_io, format="jpeg")
163
- image_bytes = image_io.getvalue()
164
-
165
- return {
166
- "preview": preview_img,
167
- "api_data": {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
168
- }
169
-
170
- async def stream_camera_frames(self):
171
- try:
172
- self.camera_capture = await asyncio.to_thread(cv2.VideoCapture, 0) # 0 for default camera
173
- if not self.camera_capture.isOpened():
174
- st.error("Could not open camera. Please check permissions and availability.")
175
- self.is_running = False
176
- return
177
-
178
- while self.is_running:
179
- frame_data = await asyncio.to_thread(self._process_camera_frame)
180
- if frame_data:
181
- st.session_state['current_frame_preview'] = frame_data["preview"]
182
- if self.media_to_gemini_queue.full():
183
- await self.media_to_gemini_queue.get() # Make space
184
- await self.media_to_gemini_queue.put(frame_data["api_data"])
185
- await asyncio.sleep(1.0 / VIDEO_FPS_LIMIT)
186
- except Exception as e:
187
- # st.error(f"Camera streaming error: {e}") # Avoid st.error from non-main thread if problematic
188
- print(f"Camera streaming error: {e}\nTraceback: {traceback.format_exc()}")
189
- self.is_running = False
190
- finally:
191
- if self.camera_capture:
192
- await asyncio.to_thread(self.camera_capture.release)
193
- self.camera_capture = None
194
- print("Camera streaming task finished.")
195
-
196
- def _process_screen_frame(self):
197
- with mss.mss() as sct:
198
- monitor_index = 1
199
- if len(sct.monitors) <= monitor_index:
200
- monitor_index = 0
201
- monitor = sct.monitors[monitor_index]
202
-
203
- sct_img = sct.grab(monitor)
204
- img = PIL.Image.frombytes("RGB", (sct_img.width, sct_img.height), sct_img.rgb)
205
-
206
- preview_img = img.copy()
207
- preview_img.thumbnail(VIDEO_PREVIEW_RESIZE)
208
-
209
- api_img = img.copy()
210
- api_img.thumbnail(VIDEO_API_RESIZE)
211
-
212
- image_io = io.BytesIO()
213
- api_img.save(image_io, format="jpeg")
214
- image_bytes = image_io.getvalue()
215
-
216
- return {
217
- "preview": preview_img,
218
- "api_data": {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
219
- }
220
-
221
- async def stream_screen_frames(self):
222
- try:
223
- while self.is_running:
224
- frame_data = await asyncio.to_thread(self._process_screen_frame)
225
- if frame_data:
226
- st.session_state['current_frame_preview'] = frame_data["preview"]
227
- if self.media_to_gemini_queue.full():
228
- await self.media_to_gemini_queue.get()
229
- await self.media_to_gemini_queue.put(frame_data["api_data"])
230
- await asyncio.sleep(1.0 / VIDEO_FPS_LIMIT)
231
- except Exception as e:
232
- # st.error(f"Screen capture error: {e}")
233
- print(f"Screen capture error: {e}\nTraceback: {traceback.format_exc()}")
234
- self.is_running = False
235
- finally:
236
- print("Screen streaming task finished.")
237
 
238
  async def stream_media_to_gemini(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
239
  try:
240
  while self.is_running:
241
- if not self.gemini_session:
242
- await asyncio.sleep(0.1)
243
- continue
244
- try:
245
- media_chunk = await asyncio.wait_for(self.media_to_gemini_queue.get(), timeout=1.0)
246
- if media_chunk and self.gemini_session and self.is_running:
247
- await self.gemini_session.send(input=media_chunk)
248
- if media_chunk: # Avoid task_done on None if used as sentinel
249
- self.media_to_gemini_queue.task_done()
250
- except asyncio.TimeoutError:
251
- continue
252
- except Exception as e:
253
- if self.is_running:
254
- print(f"Error in stream_media_to_gemini: {e}")
255
- await asyncio.sleep(0.1)
256
- except asyncio.CancelledError:
257
- print("stream_media_to_gemini task cancelled.")
258
- finally:
259
- print("Media streaming to Gemini task finished.")
260
-
261
- async def capture_microphone_audio(self):
262
- try:
263
- mic_info = await asyncio.to_thread(pya.get_default_input_device_info)
264
- self.mic_stream = await asyncio.to_thread(
265
- pya.open,
266
- format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE,
267
- input=True, input_device_index=mic_info["index"],
268
- frames_per_buffer=CHUNK_SIZE,
269
- )
270
- print("Microphone stream opened.")
271
- while self.is_running:
272
- try:
273
- audio_data = await asyncio.to_thread(self.mic_stream.read, CHUNK_SIZE, exception_on_overflow=False)
274
- if self.media_to_gemini_queue.full():
275
- await self.media_to_gemini_queue.get()
276
- await self.media_to_gemini_queue.put({"data": audio_data, "mime_type": "audio/pcm"})
277
- except IOError as e:
278
- if e.errno == pyaudio.paInputOverflowed: # type: ignore
279
- print("Microphone Input overflowed. Skipping.")
280
- else:
281
- print(f"Microphone read IOError: {e}")
282
- self.is_running = False; break
283
- except Exception as e:
284
- print(f"Error in capture_microphone_audio: {e}")
285
- await asyncio.sleep(0.01)
286
- except Exception as e:
287
- # st.error(f"Failed to open microphone: {e}. Please check permissions.")
288
- print(f"Failed to open microphone: {e}\nTraceback: {traceback.format_exc()}")
289
- self.is_running = False
290
- finally:
291
- if self.mic_stream:
292
- await asyncio.to_thread(self.mic_stream.stop_stream)
293
- await asyncio.to_thread(self.mic_stream.close)
294
- self.mic_stream = None
295
- print("Microphone capture task finished.")
296
 
297
  async def process_gemini_responses(self):
 
298
  try:
299
  while self.is_running:
300
- if not self.gemini_session:
301
  await asyncio.sleep(0.1); continue
302
  try:
303
- turn_response = self.gemini_session.receive()
304
  async for chunk in turn_response:
305
  if not self.is_running: break
306
- if audio_data := chunk.data:
307
- if not self.audio_playback_queue.full():
308
- self.audio_playback_queue.put_nowait(audio_data)
309
- else:
310
- print("Audio playback queue full, discarding data.")
311
- if text_response := chunk.text:
312
- # Schedule Streamlit update from the main thread if possible,
313
- # or use st.session_state and rely on rerun.
314
- st.session_state['chat_messages'] = st.session_state['chat_messages'] + [{"role": "assistant", "content": text_response}]
315
- # If immediate UI update is needed and safe:
316
- # st.experimental_rerun() # Use with caution from background threads
317
- except types.generation_types.StopCandidateException:
318
- print("Gemini response stream ended (StopCandidateException).")
319
  except Exception as e:
320
- if self.is_running:
321
- print(f"Error receiving from Gemini: {e}")
322
- await asyncio.sleep(0.1)
323
- except asyncio.CancelledError:
324
- print("process_gemini_responses task cancelled.")
325
- finally:
326
- print("Gemini response processing task finished.")
327
 
328
  async def play_gemini_audio(self):
 
329
  try:
330
  self.playback_stream = await asyncio.to_thread(
331
- pya.open, format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE, output=True
332
  )
333
- print("Audio playback stream opened.")
334
  while self.is_running:
335
  try:
336
- audio_chunk = await asyncio.wait_for(self.audio_playback_queue.get(), timeout=1.0)
337
- if audio_chunk: # Check if not None (sentinel)
338
  await asyncio.to_thread(self.playback_stream.write, audio_chunk)
339
- if audio_chunk: # Avoid task_done on None
340
- self.audio_playback_queue.task_done()
341
- except asyncio.TimeoutError:
342
- continue
343
- except Exception as e:
344
- print(f"Error playing audio: {e}")
345
- await asyncio.sleep(0.01)
346
  except Exception as e:
347
- # st.error(f"Failed to open audio playback stream: {e}")
348
- print(f"Failed to open audio playback stream: {e}\nTraceback: {traceback.format_exc()}")
349
  self.is_running = False
350
  finally:
351
  if self.playback_stream:
 
352
  await asyncio.to_thread(self.playback_stream.stop_stream)
353
  await asyncio.to_thread(self.playback_stream.close)
354
  self.playback_stream = None
355
- print("Audio playback task finished.")
356
 
357
  def signal_stop(self):
358
- print("Signal to stop AudioVisualLoop received.")
359
  self.is_running = False
360
- if self.media_to_gemini_queue: self.media_to_gemini_queue.put_nowait(None)
361
- if self.audio_playback_queue: self.audio_playback_queue.put_nowait(None)
 
 
 
362
 
363
  async def run_main_loop(self):
364
  self.async_event_loop = asyncio.get_running_loop()
365
  self.is_running = True
366
- # st.session_state['session_active'] = True # Set by caller in Streamlit thread
367
-
368
- print("AudioVisualLoop starting...")
369
- # Ensure client is not None before using (should be handled by global init)
370
  if client is None:
371
- print("Error: Gemini client is not initialized.")
372
- st.error("Critical Error: Gemini client failed to initialize. Cannot start session.")
373
- st.session_state['session_active'] = False
374
  return
375
 
376
  try:
377
- # Use the global 'client' instance
378
  async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
379
  self.gemini_session = session
380
- print("Gemini session established.")
381
-
382
  try:
383
- print("Sending system prompt to Gemini...")
384
  await self.gemini_session.send(input=MEDICAL_ASSISTANT_SYSTEM_PROMPT, end_of_turn=False)
385
- print("System prompt sent successfully.")
386
  except Exception as e:
387
- # st.error(f"Failed to send system prompt to Gemini: {e}")
388
- print(f"Failed to send system prompt to Gemini: {e}\nTraceback: {traceback.format_exc()}")
389
- self.is_running = False
390
- return
391
-
392
- self.audio_playback_queue = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
393
- self.media_to_gemini_queue = asyncio.Queue(maxsize=MEDIA_OUT_QUEUE_MAXSIZE)
394
 
395
  async with asyncio.TaskGroup() as tg:
396
- print("Creating async tasks...")
397
  tg.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
398
- tg.create_task(self.capture_microphone_audio(), name="capture_microphone_audio")
399
-
400
- if self.video_mode == "camera":
401
- tg.create_task(self.stream_camera_frames(), name="stream_camera_frames")
402
- elif self.video_mode == "screen":
403
- tg.create_task(self.stream_screen_frames(), name="stream_screen_frames")
404
-
405
  tg.create_task(self.process_gemini_responses(), name="process_gemini_responses")
406
  tg.create_task(self.play_gemini_audio(), name="play_gemini_audio")
407
- print("All async tasks created in TaskGroup.")
408
-
409
- print("TaskGroup finished execution.")
410
-
411
- except asyncio.CancelledError:
412
- print("AudioVisualLoop.run_main_loop() was cancelled.")
413
  except ExceptionGroup as eg:
414
- # st.error(f"An error occurred in one of the concurrent tasks: {eg.exceptions[0]}")
415
- print(f"ExceptionGroup caught in AudioVisualLoop: {eg}")
416
  for i, exc in enumerate(eg.exceptions):
417
- print(f" Task Exception {i+1}/{len(eg.exceptions)}: {type(exc).__name__}: {exc}")
418
- traceback.print_exception(type(exc), exc, exc.__traceback__)
419
- except Exception as e: # This catches the ConnectionClosedError
420
- # st.error(f"A critical error occurred in the main session loop: {e}")
421
- print(f"General Exception in AudioVisualLoop: {type(e).__name__}: {e}")
422
- traceback.print_exception(e) # Print full traceback for debugging
423
  finally:
424
- print("AudioVisualLoop.run_main_loop() finishing...")
425
- self.is_running = False
426
- # st.session_state['session_active'] = False # Set by caller in Streamlit thread
427
- self.gemini_session = None
428
- print("AudioVisualLoop finished.")
 
 
 
 
 
 
 
 
 
 
 
 
429
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
430
 
431
- # --- Streamlit UI ---
432
  def run_streamlit_app():
 
 
 
433
  st.title("Live AI Medical Assistant")
434
-
 
435
  with st.sidebar:
436
  st.header("Session Control")
437
- video_mode_options = ["camera", "screen", "none"]
438
- current_video_mode = st.session_state.get('video_mode_selection', DEFAULT_VIDEO_MODE)
439
- selected_video_mode = st.selectbox(
440
- "Video Source:",
441
- video_mode_options,
442
- index=video_mode_options.index(current_video_mode),
443
- disabled=st.session_state.get('session_active', False)
444
- )
445
- if selected_video_mode != current_video_mode: # Update if changed and not active
446
- st.session_state['video_mode_selection'] = selected_video_mode
447
-
448
- if not st.session_state.get('session_active', False):
449
- if st.button("πŸš€ Start Session", type="primary", use_container_width=True):
450
- st.session_state['session_active'] = True # Set active before starting thread
451
- st.session_state['chat_messages'] = [{
452
- "role": "system",
453
- "content": (
454
- "Medical Assistant is activating. The AI has been instructed on its role to visually assist you. "
455
- "Remember: This AI cannot provide medical diagnoses or replace consultation with a healthcare professional."
456
- )
457
- }]
458
- st.session_state['current_frame_preview'] = None
459
-
460
- audio_loop_instance = AudioVisualLoop(video_mode_setting=st.session_state['video_mode_selection'])
461
- st.session_state['audio_loop_instance'] = audio_loop_instance
462
 
463
- threading.Thread(target=lambda: asyncio.run(audio_loop_instance.run_main_loop()), daemon=True).start()
464
- st.success("Session starting... Please wait for initialization.")
465
- time.sleep(1)
 
 
 
 
 
 
 
466
  st.rerun()
467
- else:
468
- if st.button("οΏ½οΏ½οΏ½ Stop Session", type="secondary", use_container_width=True):
469
- if st.session_state.get('audio_loop_instance'):
470
- st.session_state['audio_loop_instance'].signal_stop()
471
- st.session_state['audio_loop_instance'] = None
472
- st.session_state['session_active'] = False # Set inactive
473
- st.warning("Session stopping... Please wait.")
474
- time.sleep(1)
475
  st.rerun()
476
-
477
- col_video, col_chat = st.columns([2, 3])
478
-
479
- with col_video:
480
- st.subheader("Live Feed")
481
- if st.session_state.get('session_active', False) and st.session_state.get('video_mode_selection', DEFAULT_VIDEO_MODE) != "none":
482
- if st.session_state.get('current_frame_preview') is not None:
483
- st.image(st.session_state['current_frame_preview'], caption="Live Video Feed", use_column_width=True)
484
- else:
485
- st.info("Waiting for video feed...")
486
- elif st.session_state.get('video_mode_selection', DEFAULT_VIDEO_MODE) != "none":
487
- st.info("Video feed will appear here once the session starts.")
488
- else:
489
- st.info("Video input is disabled for this session.")
490
-
491
- with col_chat:
492
- st.subheader("Chat with Assistant")
493
- # Display chat messages from session state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
494
  for msg in st.session_state.get('chat_messages', []):
495
  with st.chat_message(msg["role"]):
496
  st.write(msg["content"])
497
-
498
- user_chat_input = st.chat_input(
499
- "Type your message or ask about the video...",
500
- key="user_chat_input_box",
501
- disabled=not st.session_state.get('session_active', False)
502
- )
503
 
504
- if user_chat_input:
505
- # Append user message and rerun to display it immediately
506
- st.session_state['chat_messages'] = st.session_state.get('chat_messages', []) + [{"role": "user", "content": user_chat_input}]
507
-
508
- loop_instance = st.session_state.get('audio_loop_instance')
509
- if loop_instance and loop_instance.async_event_loop and loop_instance.gemini_session:
510
- if loop_instance.async_event_loop.is_running():
511
- asyncio.run_coroutine_threadsafe(
512
- loop_instance.send_text_input_to_gemini(user_chat_input),
513
- loop_instance.async_event_loop
514
- )
515
- else:
516
- st.error("Session event loop is not running. Cannot send message.")
517
- elif not loop_instance or not st.session_state.get('session_active', False):
518
- st.error("Session is not active. Please start a session to send messages.")
519
- else:
520
- st.warning("Session components not fully ready. Please wait a moment.")
521
- st.rerun() # Rerun to show user message and any quick AI text response
522
 
523
  if __name__ == "__main__":
524
- run_streamlit_app()
 
 
 
 
6
  import io
7
  import threading
8
  import traceback
9
+ import time
10
+ import logging
 
11
  from dotenv import load_dotenv
12
 
13
+ import cv2 # For image processing
14
+ import pyaudio # For audio PLAYBACK
 
15
  import PIL.Image
 
16
 
17
  from google import genai
18
  from google.genai import types
19
 
20
+ # streamlit-webrtc components
21
+ from streamlit_webrtc import (
22
+ webrtc_streamer,
23
+ WebRtcMode,
24
+ AudioProcessorBase,
25
+ VideoProcessorBase,
26
+ ClientSettings
27
+ )
28
+ from aiortc import RTCIceServer, RTCConfiguration # For STUN server configuration
29
+
30
  # --- Configuration ---
31
+ load_dotenv()
32
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
33
 
34
  # Audio configuration
35
+ PYAUDIO_FORMAT = pyaudio.paInt16 # For PyAudio playback
36
+ PYAUDIO_CHANNELS = 1 # For PyAudio playback
37
+ WEBRTC_REQUESTED_AUDIO_CHANNELS = 1 # Request mono audio from WebRTC
38
+ WEBRTC_REQUESTED_SEND_SAMPLE_RATE = 16000 # Target sample rate for audio sent to Gemini
39
+ GEMINI_AUDIO_RECEIVE_SAMPLE_RATE = 24000 # Gemini documentation recommendation for its TTS
40
+ PYAUDIO_PLAYBACK_CHUNK_SIZE = 1024 # For PyAudio playback
41
+ AUDIO_PLAYBACK_QUEUE_MAXSIZE = 50
42
+ MEDIA_TO_GEMINI_QUEUE_MAXSIZE = 30
43
 
44
  # Video configuration
45
+ VIDEO_FPS_TO_GEMINI = 2 # Target FPS to send to Gemini (increased slightly)
46
+ VIDEO_API_RESIZE = (1024, 1024) # Max size for images sent to API
 
47
 
48
+ MODEL_NAME = "models/gemini-2.0-flash-live-001"
 
 
49
 
 
50
  MEDICAL_ASSISTANT_SYSTEM_PROMPT = """You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
51
 
52
  Your responsibilities are:
 
63
  Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
64
  """
65
 
66
+ # --- PyAudio Global Instance and Cleanup ---
67
  pya = pyaudio.PyAudio()
 
 
68
  def cleanup_pyaudio():
69
+ logging.info("Terminating PyAudio instance.")
70
+ if pya: # Check if pya is not None
71
+ pya.terminate()
72
  atexit.register(cleanup_pyaudio)
73
 
74
+ # --- Global Queues for WebRTC to Backend Communication ---
75
+ video_frames_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
76
+ audio_chunks_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
77
+ audio_from_gemini_playback_q = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
78
 
79
+ # --- Gemini Client Setup ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
81
+ client = None
82
+ if GEMINI_API_KEY:
83
+ try:
84
+ client = genai.Client(http_options={"api_version": "v1beta"}, api_key=GEMINI_API_KEY)
85
+ except Exception as e:
86
+ # This error will be shown in Streamlit UI if it happens at startup
87
+ st.error(f"Failed to initialize Gemini client: {e}")
88
+ logging.critical(f"Gemini client initialization failed: {e}", exc_info=True)
89
+ st.stop() # Stop Streamlit app if client fails
90
+ else:
91
+ st.error("GEMINI_API_KEY not found in environment variables. Please set it for the application to run.")
92
+ logging.critical("GEMINI_API_KEY not found.")
93
  st.stop()
94
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
96
+ response_modalities=["audio", "text"],
97
  speech_config=types.SpeechConfig(
98
+ voice_config=types.VoiceConfig(prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr"))
 
 
99
  ),
100
  )
101
 
102
+ # --- Backend Gemini Interaction Loop ---
103
+ class GeminiInteractionLoop:
104
+ def __init__(self):
105
  self.gemini_session = None
106
+ self.async_event_loop = None
107
+ self.is_running = True
108
+ self.playback_stream = None
 
 
 
 
 
 
109
 
110
  async def send_text_input_to_gemini(self, user_text):
111
  if not user_text or not self.gemini_session or not self.is_running:
112
+ logging.warning("Cannot send text. Session not active, no text, or not running.")
113
  return
114
  try:
115
+ logging.info(f"Sending text to Gemini: '{user_text[:50]}...'")
116
  await self.gemini_session.send(input=user_text, end_of_turn=True)
117
  except Exception as e:
118
+ logging.error(f"Error sending text message to Gemini: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
  async def stream_media_to_gemini(self):
121
+ logging.info("Task started: Stream media from WebRTC queues to Gemini.")
122
+ async def get_media_from_queues():
123
+ try:
124
+ video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
125
+ video_frames_to_gemini_q.task_done()
126
+ return video_frame
127
+ except asyncio.TimeoutError: pass
128
+ except Exception as e: logging.error(f"Error getting video from queue: {e}", exc_info=True)
129
+
130
+ try:
131
+ audio_chunk = await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.02)
132
+ audio_chunks_to_gemini_q.task_done()
133
+ return audio_chunk
134
+ except asyncio.TimeoutError: return None
135
+ except Exception as e:
136
+ logging.error(f"Error getting audio from queue: {e}", exc_info=True)
137
+ return None
138
  try:
139
  while self.is_running:
140
+ if not self.gemini_session:
141
+ await asyncio.sleep(0.1); continue
142
+ media_data = await get_media_from_queues()
143
+ if media_data and self.gemini_session and self.is_running:
144
+ try:
145
+ await self.gemini_session.send(input=media_data)
146
+ except Exception as e: logging.error(f"Error sending media chunk to Gemini: {e}", exc_info=True)
147
+ elif not media_data: await asyncio.sleep(0.05)
148
+ except asyncio.CancelledError: logging.info("Task cancelled: stream_media_to_gemini.")
149
+ finally: logging.info("Task finished: stream_media_to_gemini.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
  async def process_gemini_responses(self):
152
+ logging.info("Task started: Process responses from Gemini.")
153
  try:
154
  while self.is_running:
155
+ if not self.gemini_session:
156
  await asyncio.sleep(0.1); continue
157
  try:
158
+ turn_response = self.gemini_session.receive()
159
  async for chunk in turn_response:
160
  if not self.is_running: break
161
+ if audio_data := chunk.data:
162
+ if not audio_from_gemini_playback_q.full():
163
+ audio_from_gemini_playback_q.put_nowait(audio_data)
164
+ else: logging.warning("Audio playback queue full, discarding Gemini audio data.")
165
+ if text_response := chunk.text:
166
+ logging.info(f"Gemini text response: {text_response[:100]}")
167
+ if 'chat_messages' not in st.session_state: st.session_state.chat_messages = []
168
+ st.session_state.chat_messages = st.session_state.chat_messages + [{"role": "assistant", "content": text_response}]
169
+ # Consider st.experimental_rerun() if a mechanism exists to call it from main thread
170
+ except types.generation_types.StopCandidateException: logging.info("Gemini response stream ended normally.")
 
 
 
171
  except Exception as e:
172
+ if self.is_running: logging.error(f"Error receiving from Gemini: {e}", exc_info=True)
173
+ await asyncio.sleep(0.1)
174
+ except asyncio.CancelledError: logging.info("Task cancelled: process_gemini_responses.")
175
+ finally: logging.info("Task finished: process_gemini_responses.")
 
 
 
176
 
177
  async def play_gemini_audio(self):
178
+ logging.info("Task started: Play Gemini audio responses.")
179
  try:
180
  self.playback_stream = await asyncio.to_thread(
181
+ pya.open, format=PYAUDIO_FORMAT, channels=PYAUDIO_CHANNELS, rate=GEMINI_AUDIO_RECEIVE_SAMPLE_RATE, output=True, frames_per_buffer=PYAUDIO_PLAYBACK_CHUNK_SIZE
182
  )
183
+ logging.info(f"PyAudio playback stream opened at {GEMINI_AUDIO_RECEIVE_SAMPLE_RATE} Hz.")
184
  while self.is_running:
185
  try:
186
+ audio_chunk = await asyncio.wait_for(audio_from_gemini_playback_q.get(), timeout=1.0)
187
+ if audio_chunk: # Not None (sentinel)
188
  await asyncio.to_thread(self.playback_stream.write, audio_chunk)
189
+ if audio_chunk: audio_from_gemini_playback_q.task_done()
190
+ except asyncio.TimeoutError: continue
191
+ except Exception as e: logging.error(f"Error playing audio: {e}", exc_info=True); await asyncio.sleep(0.01)
 
 
 
 
192
  except Exception as e:
193
+ logging.error(f"Failed to open PyAudio playback stream: {e}", exc_info=True)
 
194
  self.is_running = False
195
  finally:
196
  if self.playback_stream:
197
+ logging.info("Stopping and closing PyAudio playback stream.")
198
  await asyncio.to_thread(self.playback_stream.stop_stream)
199
  await asyncio.to_thread(self.playback_stream.close)
200
  self.playback_stream = None
201
+ logging.info("Task finished: play_gemini_audio.")
202
 
203
  def signal_stop(self):
204
+ logging.info("Signal to stop GeminiInteractionLoop received.")
205
  self.is_running = False
206
+ for q in [video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q]:
207
+ try: q.put_nowait(None) # Sentinel to unblock .get()
208
+ except asyncio.QueueFull: logging.warning(f"Queue was full when trying to put sentinel for stop signal.")
209
+ except Exception as e: logging.error(f"Error putting sentinel in queue: {e}", exc_info=True)
210
+
211
 
212
  async def run_main_loop(self):
213
  self.async_event_loop = asyncio.get_running_loop()
214
  self.is_running = True
215
+ logging.info("GeminiInteractionLoop run_main_loop starting...")
 
 
 
216
  if client is None:
217
+ logging.critical("Gemini client is None in run_main_loop. Aborting.")
 
 
218
  return
219
 
220
  try:
 
221
  async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
222
  self.gemini_session = session
223
+ logging.info("Gemini session established with API.")
 
224
  try:
225
+ logging.info("Sending system prompt to Gemini...")
226
  await self.gemini_session.send(input=MEDICAL_ASSISTANT_SYSTEM_PROMPT, end_of_turn=False)
227
+ logging.info("System prompt sent successfully.")
228
  except Exception as e:
229
+ logging.error(f"Failed to send system prompt: {e}", exc_info=True)
230
+ self.is_running = False; return
 
 
 
 
 
231
 
232
  async with asyncio.TaskGroup() as tg:
233
+ logging.info("Creating async tasks for Gemini interaction...")
234
  tg.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
 
 
 
 
 
 
 
235
  tg.create_task(self.process_gemini_responses(), name="process_gemini_responses")
236
  tg.create_task(self.play_gemini_audio(), name="play_gemini_audio")
237
+ logging.info("All Gemini interaction tasks created in TaskGroup.")
238
+ logging.info("Gemini TaskGroup finished execution.")
239
+ except asyncio.CancelledError: logging.info("GeminiInteractionLoop.run_main_loop() was cancelled.")
 
 
 
240
  except ExceptionGroup as eg:
241
+ logging.error(f"ExceptionGroup caught in GeminiInteractionLoop: {eg}")
 
242
  for i, exc in enumerate(eg.exceptions):
243
+ logging.error(f" Task Exception {i+1}/{len(eg.exceptions)}: {type(exc).__name__}: {exc}", exc_info=exc)
244
+ except Exception as e:
245
+ logging.error(f"General Exception in GeminiInteractionLoop: {type(e).__name__}: {e}", exc_info=True)
 
 
 
246
  finally:
247
+ logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
248
+ self.is_running = False
249
+ self.gemini_session = None
250
+ logging.info("GeminiInteractionLoop finished.")
251
+
252
+ # --- WebRTC Media Processors ---
253
+ class VideoProcessor(VideoProcessorBase):
254
+ def __init__(self):
255
+ self.frame_counter = 0
256
+ self.last_gemini_send_time = time.monotonic()
257
+ # No need to get loop here if create_task is used on the default loop
258
+
259
+ async def _process_and_queue_frame_async(self, frame_ndarray):
260
+ self.frame_counter += 1
261
+ current_time = time.monotonic()
262
+ if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI):
263
+ return
264
 
265
+ self.last_gemini_send_time = current_time
266
+ try:
267
+ img_rgb = cv2.cvtColor(frame_ndarray, cv2.COLOR_BGR2RGB)
268
+ pil_img = PIL.Image.fromarray(img_rgb)
269
+ pil_img.thumbnail(VIDEO_API_RESIZE)
270
+ image_io = io.BytesIO()
271
+ pil_img.save(image_io, format="jpeg")
272
+ image_bytes = image_io.getvalue()
273
+ api_data = {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
274
+
275
+ if video_frames_to_gemini_q.full():
276
+ try: await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.01)
277
+ except asyncio.TimeoutError: logging.warning("Video queue full, frame dropped."); return
278
+ video_frames_to_gemini_q.put_nowait(api_data)
279
+ except Exception as e: logging.error(f"Error processing/queueing video frame: {e}", exc_info=True)
280
+
281
+ async def recv(self, frame): # Called by streamlit-webrtc
282
+ img_bgr = frame.to_ndarray(format="bgr24")
283
+ asyncio.create_task(self._process_and_queue_frame_async(img_bgr))
284
+ return frame # Return original frame for WebRTC to display
285
+
286
+ class AudioProcessor(AudioProcessorBase):
287
+ async def _process_and_queue_audio_async(self, audio_frames):
288
+ for frame in audio_frames: # frame is an AudioFrame from aiortc
289
+ # frame.planes[0].to_bytes() is the raw audio data
290
+ # frame.sample_rate, frame.layout.channels
291
+ # logging.info(f"Audio frame: {len(frame.planes[0].to_bytes())} bytes, SR={frame.sample_rate}, C={frame.layout.channels}")
292
+
293
+ # CRITICAL NOTE: This sends audio as received from WebRTC.
294
+ # If Gemini requires a specific sample rate (e.g., 16000 Hz) and WebRTC provides
295
+ # a different one (e.g., 48000 Hz), audio recognition may be poor.
296
+ # Proper solution: Implement resampling here. This is omitted for brevity.
297
+ audio_data = frame.planes[0].to_bytes()
298
+ # Mime type should reflect the actual data being sent.
299
+ # Example: "audio/L16;rate=48000;channels=1" if that's what WebRTC provides.
300
+ # Gemini documentation should specify what it accepts for PCM.
301
+ # Assuming "audio/pcm" is generic enough, or be more specific.
302
+ # Forcing L16 (16-bit linear PCM) as that's common.
303
+ mime_type = f"audio/L16;rate={frame.sample_rate};channels={frame.layout.channels}"
304
+ api_data = {"data": audio_data, "mime_type": mime_type}
305
+
306
+ try:
307
+ if audio_chunks_to_gemini_q.full():
308
+ try: await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.01)
309
+ except asyncio.TimeoutError: logging.warning("Audio queue full, chunk dropped."); continue
310
+ audio_chunks_to_gemini_q.put_nowait(api_data)
311
+ except Exception as e: logging.error(f"Error queueing audio chunk: {e}", exc_info=True)
312
+
313
+ async def recv(self, frames): # Called by streamlit-webrtc
314
+ asyncio.create_task(self._process_and_queue_audio_async(frames))
315
+ return frames
316
+
317
+ # --- Streamlit UI and Application Logic ---
318
+ def initialize_app_session_state():
319
+ defaults = {
320
+ 'gemini_session_active': False,
321
+ 'gemini_loop_instance': None,
322
+ 'chat_messages': [],
323
+ 'webrtc_component_key': f"webrtc_streamer_key_{int(time.time())}", # Initial dynamic key
324
+ }
325
+ for key, value in defaults.items():
326
+ if key not in st.session_state:
327
+ st.session_state[key] = value
328
 
 
329
  def run_streamlit_app():
330
+ st.set_page_config(page_title="Live AI Medical Assistant (HF Spaces)", layout="wide")
331
+ initialize_app_session_state() # Ensure state is initialized
332
+
333
  st.title("Live AI Medical Assistant")
334
+ st.markdown("Utilizing Gemini Live API via WebRTC on Hugging Face Spaces")
335
+
336
  with st.sidebar:
337
  st.header("Session Control")
338
+ if not st.session_state.gemini_session_active:
339
+ if st.button("πŸš€ Start Session", type="primary", use_container_width=True, key="start_session_btn"):
340
+ st.session_state.gemini_session_active = True
341
+ st.session_state.chat_messages = [{"role": "system", "content": "Assistant activating. Please allow camera/microphone access in your browser if prompted."}]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
342
 
343
+ for q in [video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q]:
344
+ while not q.empty():
345
+ try: q.get_nowait()
346
+ except asyncio.QueueEmpty: break
347
+
348
+ gemini_loop = GeminiInteractionLoop()
349
+ st.session_state.gemini_loop_instance = gemini_loop
350
+ threading.Thread(target=lambda: asyncio.run(gemini_loop.run_main_loop()), name="GeminiLoopThread", daemon=True).start()
351
+ st.success("Gemini session starting... WebRTC will attempt to connect.")
352
+ st.session_state.webrtc_component_key = f"webrtc_streamer_key_{int(time.time())}" # Force re-render of WebRTC
353
  st.rerun()
354
+ else: # Session is active
355
+ if st.button("πŸ›‘ Stop Session", type="secondary", use_container_width=True, key="stop_session_btn"):
356
+ if st.session_state.gemini_loop_instance:
357
+ st.session_state.gemini_loop_instance.signal_stop()
358
+ st.session_state.gemini_loop_instance = None
359
+ st.session_state.gemini_session_active = False
360
+ st.warning("Session stopping...")
361
+ time.sleep(0.5)
362
  st.rerun()
363
+
364
+ if st.session_state.gemini_session_active:
365
+ st.subheader("Your Live Feed (from your browser)")
366
+ RTC_CONFIGURATION = RTCConfiguration({"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]})
367
+ MEDIA_STREAM_CONSTRAINTS = {
368
+ "video": True, # Or specific constraints like {"width": 640, "height": 480}
369
+ "audio": { # Request specific audio format
370
+ "sampleRate": {"ideal": WEBRTC_REQUESTED_SEND_SAMPLE_RATE},
371
+ "channelCount": {"exact": WEBRTC_REQUESTED_AUDIO_CHANNELS},
372
+ "echoCancellation": True, # Recommended for voice
373
+ "noiseSuppression": True # Recommended for voice
374
+ }
375
+ }
376
+
377
+ webrtc_ctx = webrtc_streamer(
378
+ key=st.session_state.webrtc_component_key,
379
+ mode=WebRtcMode.SENDONLY,
380
+ rtc_configuration=RTC_CONFIGURATION,
381
+ media_stream_constraints=MEDIA_STREAM_CONSTRAINTS,
382
+ video_processor_factory=VideoProcessor,
383
+ audio_processor_factory=AudioProcessor,
384
+ async_processing=True,
385
+ # desired_playing_state=st.session_state.gemini_session_active # Let it be controlled by rendering
386
+ )
387
+
388
+ if webrtc_ctx.state.playing:
389
+ st.caption("WebRTC connected. Streaming your camera and microphone.")
390
+ elif st.session_state.gemini_session_active:
391
+ st.caption("WebRTC attempting to connect. Ensure camera/microphone permissions are granted in your browser.")
392
+ if webrtc_ctx.state.error:
393
+ st.error(f"WebRTC Connection Error: {webrtc_ctx.state.error}")
394
+ else:
395
+ st.info("Click 'Start Session' in the sidebar to enable the live feed and assistant.")
396
+
397
+ st.subheader("Chat with Assistant")
398
+ chat_placeholder = st.container() # Use a container for chat messages
399
+ with chat_placeholder:
400
  for msg in st.session_state.get('chat_messages', []):
401
  with st.chat_message(msg["role"]):
402
  st.write(msg["content"])
403
+
404
+ user_chat_input = st.chat_input(
405
+ "Type your message...",
406
+ key="user_chat_input_box",
407
+ disabled=not st.session_state.gemini_session_active
408
+ )
409
 
410
+ if user_chat_input:
411
+ current_messages = st.session_state.get('chat_messages', [])
412
+ current_messages.append({"role": "user", "content": user_chat_input})
413
+ st.session_state.chat_messages = current_messages
414
+
415
+ loop_instance = st.session_state.get('gemini_loop_instance')
416
+ if loop_instance and loop_instance.async_event_loop and loop_instance.gemini_session:
417
+ if loop_instance.async_event_loop.is_running():
418
+ asyncio.run_coroutine_threadsafe(
419
+ loop_instance.send_text_input_to_gemini(user_chat_input),
420
+ loop_instance.async_event_loop
421
+ )
422
+ else: st.error("Session event loop is not running. Cannot send message.")
423
+ elif not loop_instance or not st.session_state.gemini_session_active:
424
+ st.error("Session is not active. Please start a session to send messages.")
425
+ else: st.warning("Session components not fully ready. Please wait a moment.")
426
+ st.rerun()
 
427
 
428
  if __name__ == "__main__":
429
+ if client is None: # Final check before running
430
+ logging.critical("Gemini client could not be initialized. Application cannot start.")
431
+ else:
432
+ run_streamlit_app()