noumanjavaid commited on
Commit
5bef5e2
·
verified ·
1 Parent(s): 99636da

Update src/streamlit_app.py

Browse files
Files changed (1) hide show
  1. src/streamlit_app.py +87 -89
src/streamlit_app.py CHANGED
@@ -6,7 +6,7 @@ import base64
6
  import io
7
  import threading
8
  import traceback
9
- import atexit
10
  import time
11
  import logging
12
  from dotenv import load_dotenv
@@ -25,7 +25,7 @@ from streamlit_webrtc import (
25
  AudioProcessorBase,
26
  VideoProcessorBase,
27
  )
28
- # from aiortc import RTCIceServer, RTCConfiguration # RTCConfiguration object not needed directly
29
 
30
  # --- Configuration ---
31
  load_dotenv()
@@ -45,6 +45,7 @@ MEDIA_TO_GEMINI_QUEUE_MAXSIZE = 30
45
  VIDEO_FPS_TO_GEMINI = 2
46
  VIDEO_API_RESIZE = (1024, 1024)
47
 
 
48
  MODEL_NAME = "models/gemini-2.0-flash-live-001"
49
 
50
  MEDICAL_ASSISTANT_SYSTEM_PROMPT = """You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
@@ -91,12 +92,16 @@ else:
91
  logging.critical("GEMINI_API_KEY not found.")
92
  st.stop()
93
 
 
 
 
94
  LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
95
  response_modalities=["audio", "text"],
96
- speech_config=types.SpeechConfig(
97
- voice_config=types.VoiceConfig(prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr"))
98
- ),
99
  )
 
100
 
101
  # --- Backend Gemini Interaction Loop ---
102
  class GeminiInteractionLoop:
@@ -105,8 +110,6 @@ class GeminiInteractionLoop:
105
  self.async_event_loop = None
106
  self.is_running = True
107
  self.playback_stream = None
108
- # Queues will be initialized in run_main_loop and assigned to global vars
109
- # This class will use the global queue variables directly
110
 
111
  async def send_text_input_to_gemini(self, user_text):
112
  if not user_text or not self.gemini_session or not self.is_running:
@@ -121,33 +124,24 @@ class GeminiInteractionLoop:
121
  async def stream_media_to_gemini(self):
122
  logging.info("Task started: Stream media from WebRTC queues to Gemini.")
123
  async def get_media_from_queues():
124
- # Ensure queues are initialized before trying to get from them
125
  if video_frames_to_gemini_q is None or audio_chunks_to_gemini_q is None:
126
- await asyncio.sleep(0.1) # Wait for queues to be initialized
127
- return None
128
  try:
129
  video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
130
- video_frames_to_gemini_q.task_done()
131
- return video_frame
132
  except asyncio.TimeoutError: pass
133
  except Exception as e: logging.error(f"Error getting video from queue: {e}", exc_info=True)
134
-
135
  try:
136
  audio_chunk = await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.02)
137
- audio_chunks_to_gemini_q.task_done()
138
- return audio_chunk
139
  except asyncio.TimeoutError: return None
140
- except Exception as e:
141
- logging.error(f"Error getting audio from queue: {e}", exc_info=True)
142
- return None
143
  try:
144
  while self.is_running:
145
- if not self.gemini_session:
146
- await asyncio.sleep(0.1); continue
147
  media_data = await get_media_from_queues()
148
  if media_data and self.gemini_session and self.is_running:
149
- try:
150
- await self.gemini_session.send(input=media_data)
151
  except Exception as e: logging.error(f"Error sending media chunk to Gemini: {e}", exc_info=True)
152
  elif not media_data: await asyncio.sleep(0.05)
153
  except asyncio.CancelledError: logging.info("Task cancelled: stream_media_to_gemini.")
@@ -157,21 +151,19 @@ class GeminiInteractionLoop:
157
  logging.info("Task started: Process responses from Gemini.")
158
  try:
159
  while self.is_running:
160
- if not self.gemini_session:
161
- await asyncio.sleep(0.1); continue
162
- if audio_from_gemini_playback_q is None: # Wait for queue init
163
- await asyncio.sleep(0.1); continue
164
  try:
165
  turn_response = self.gemini_session.receive()
166
  async for chunk in turn_response:
167
  if not self.is_running: break
168
  if audio_data := chunk.data:
169
- if not audio_from_gemini_playback_q.full():
170
- audio_from_gemini_playback_q.put_nowait(audio_data)
171
  else: logging.warning("Audio playback queue full, discarding Gemini audio data.")
172
  if text_response := chunk.text:
173
  logging.info(f"Gemini text response: {text_response[:100]}")
174
  if 'chat_messages' not in st.session_state: st.session_state.chat_messages = []
 
175
  st.session_state.chat_messages = st.session_state.chat_messages + [{"role": "assistant", "content": text_response}]
176
  except types.generation_types.StopCandidateException: logging.info("Gemini response stream ended normally.")
177
  except Exception as e:
@@ -182,10 +174,10 @@ class GeminiInteractionLoop:
182
 
183
  async def play_gemini_audio(self):
184
  logging.info("Task started: Play Gemini audio responses.")
 
 
185
  try:
186
- # Wait for the playback queue to be initialized
187
- while audio_from_gemini_playback_q is None and self.is_running:
188
- await asyncio.sleep(0.1)
189
  if not self.is_running: return
190
 
191
  self.playback_stream = await asyncio.to_thread(
@@ -195,49 +187,48 @@ class GeminiInteractionLoop:
195
  while self.is_running:
196
  try:
197
  audio_chunk = await asyncio.wait_for(audio_from_gemini_playback_q.get(), timeout=1.0)
198
- if audio_chunk:
199
- await asyncio.to_thread(self.playback_stream.write, audio_chunk)
200
  if audio_chunk: audio_from_gemini_playback_q.task_done()
201
  except asyncio.TimeoutError: continue
202
- except Exception as e: logging.error(f"Error playing audio: {e}", exc_info=True); await asyncio.sleep(0.01)
203
  except Exception as e:
204
- logging.error(f"Failed to open PyAudio playback stream: {e}", exc_info=True)
205
- self.is_running = False
206
  finally:
207
  if self.playback_stream:
208
  logging.info("Stopping and closing PyAudio playback stream.")
209
- await asyncio.to_thread(self.playback_stream.stop_stream)
210
- await asyncio.to_thread(self.playback_stream.close)
 
 
 
211
  self.playback_stream = None
212
  logging.info("Task finished: play_gemini_audio.")
213
 
214
  def signal_stop(self):
215
  logging.info("Signal to stop GeminiInteractionLoop received.")
216
  self.is_running = False
217
- # Use global queue variables directly
218
  for q in [video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q]:
219
- if q: # Check if queue is initialized
220
  try: q.put_nowait(None)
221
  except asyncio.QueueFull: logging.warning(f"Queue was full when trying to put sentinel for stop signal.")
222
  except Exception as e: logging.error(f"Error putting sentinel in queue: {e}", exc_info=True)
223
 
224
  async def run_main_loop(self):
225
- global video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q # Allow modification of global vars
226
 
227
  self.async_event_loop = asyncio.get_running_loop()
228
  self.is_running = True
229
  logging.info("GeminiInteractionLoop run_main_loop starting...")
230
 
231
- # Initialize queues here, within the asyncio loop of this thread
232
  video_frames_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
233
  audio_chunks_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
234
  audio_from_gemini_playback_q = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
235
  logging.info("Asyncio queues initialized in GeminiInteractionLoop.")
236
 
237
- if client is None:
238
- logging.critical("Gemini client is None in run_main_loop. Aborting.")
239
- return
240
 
 
241
  try:
242
  async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
243
  self.gemini_session = session
@@ -251,24 +242,32 @@ class GeminiInteractionLoop:
251
  self.is_running = False; return
252
 
253
  async with asyncio.TaskGroup() as tg:
 
254
  logging.info("Creating async tasks for Gemini interaction...")
255
  tg.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
256
  tg.create_task(self.process_gemini_responses(), name="process_gemini_responses")
257
  tg.create_task(self.play_gemini_audio(), name="play_gemini_audio")
258
  logging.info("All Gemini interaction tasks created in TaskGroup.")
259
  logging.info("Gemini TaskGroup finished execution.")
 
260
  except asyncio.CancelledError: logging.info("GeminiInteractionLoop.run_main_loop() was cancelled.")
261
- except ExceptionGroup as eg:
262
- logging.error(f"ExceptionGroup caught in GeminiInteractionLoop: {eg}")
263
- for i, exc in enumerate(eg.exceptions):
264
- logging.error(f" Task Exception {i+1}/{len(eg.exceptions)}: {type(exc).__name__}: {exc}", exc_info=exc)
265
  except Exception as e:
266
- logging.error(f"General Exception in GeminiInteractionLoop: {type(e).__name__}: {e}", exc_info=True)
 
267
  finally:
268
  logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
269
- self.is_running = False
270
- self.gemini_session = None
271
- # Clear queues on exit to prevent issues if loop restarts
 
 
 
 
 
 
 
 
272
  video_frames_to_gemini_q = None
273
  audio_chunks_to_gemini_q = None
274
  audio_from_gemini_playback_q = None
@@ -281,15 +280,10 @@ class VideoProcessor(VideoProcessorBase):
281
  self.last_gemini_send_time = time.monotonic()
282
 
283
  async def _process_and_queue_frame_async(self, frame_ndarray):
284
- if video_frames_to_gemini_q is None: # Wait for queue to be initialized
285
- logging.debug("VideoProcessor: video_frames_to_gemini_q is None, waiting...")
286
- return
287
-
288
  self.frame_counter += 1
289
  current_time = time.monotonic()
290
- if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI):
291
- return
292
-
293
  self.last_gemini_send_time = current_time
294
  try:
295
  img_rgb = cv2.cvtColor(frame_ndarray, cv2.COLOR_BGR2RGB)
@@ -299,7 +293,6 @@ class VideoProcessor(VideoProcessorBase):
299
  pil_img.save(image_io, format="jpeg")
300
  image_bytes = image_io.getvalue()
301
  api_data = {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
302
-
303
  if video_frames_to_gemini_q.full():
304
  try: await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.01)
305
  except asyncio.TimeoutError: logging.warning("Video queue full, frame dropped."); return
@@ -308,27 +301,19 @@ class VideoProcessor(VideoProcessorBase):
308
 
309
  async def recv(self, frame):
310
  img_bgr = frame.to_ndarray(format="bgr24")
311
- # Ensure an event loop is running in the current thread for create_task
312
  try:
313
  loop = asyncio.get_running_loop()
314
  loop.create_task(self._process_and_queue_frame_async(img_bgr))
315
- except RuntimeError: # No running loop in this thread (should not happen with streamlit-webrtc async_processing=True)
316
- logging.error("VideoProcessor.recv: No running asyncio loop in current thread for create_task.")
317
- # Fallback or log error, direct call might block WebRTC thread
318
- # await self._process_and_queue_frame_async(img_bgr) # Potentially blocking
319
  return frame
320
 
321
  class AudioProcessor(AudioProcessorBase):
322
  async def _process_and_queue_audio_async(self, audio_frames):
323
- if audio_chunks_to_gemini_q is None: # Wait for queue to be initialized
324
- logging.debug("AudioProcessor: audio_chunks_to_gemini_q is None, waiting...")
325
- return
326
-
327
  for frame in audio_frames:
328
  audio_data = frame.planes[0].to_bytes()
329
  mime_type = f"audio/L16;rate={frame.sample_rate};channels={frame.layout.channels}"
330
  api_data = {"data": audio_data, "mime_type": mime_type}
331
-
332
  try:
333
  if audio_chunks_to_gemini_q.full():
334
  try: await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.01)
@@ -340,9 +325,7 @@ class AudioProcessor(AudioProcessorBase):
340
  try:
341
  loop = asyncio.get_running_loop()
342
  loop.create_task(self._process_and_queue_audio_async(frames))
343
- except RuntimeError:
344
- logging.error("AudioProcessor.recv: No running asyncio loop in current thread for create_task.")
345
- # await self._process_and_queue_audio_async(frames) # Potentially blocking
346
  return frames
347
 
348
  # --- Streamlit UI and Application Logic ---
@@ -363,6 +346,8 @@ def run_streamlit_app():
363
 
364
  st.title("Live AI Medical Assistant")
365
  st.markdown("Utilizing Gemini Live API via WebRTC on Hugging Face Spaces")
 
 
366
 
367
  with st.sidebar:
368
  st.header("Session Control")
@@ -402,11 +387,6 @@ def run_streamlit_app():
402
  }
403
  }
404
 
405
- # Only render WebRTC streamer if queues are expected to be initialized by the Gemini loop
406
- # This is a bit of a race condition check, might need refinement
407
- # A better way would be for Gemini loop to signal when queues are ready.
408
- # For now, we assume if session is active, loop is trying to start and init queues.
409
-
410
  webrtc_ctx = webrtc_streamer(
411
  key=st.session_state.webrtc_component_key,
412
  mode=WebRtcMode.SENDONLY,
@@ -414,8 +394,8 @@ def run_streamlit_app():
414
  "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
415
  },
416
  media_stream_constraints=MEDIA_STREAM_CONSTRAINTS,
417
- video_processor_factory=VideoProcessor, # Pass the class
418
- audio_processor_factory=AudioProcessor, # Pass the class
419
  async_processing=True,
420
  )
421
 
@@ -429,12 +409,16 @@ def run_streamlit_app():
429
  st.info("Click 'Start Session' in the sidebar to enable the live feed and assistant.")
430
 
431
  st.subheader("Chat with Assistant")
432
- chat_placeholder = st.container()
433
- with chat_placeholder:
434
- for msg in st.session_state.get('chat_messages', []):
 
 
 
435
  with st.chat_message(msg["role"]):
436
  st.write(msg["content"])
437
-
 
438
  user_chat_input = st.chat_input(
439
  "Type your message...",
440
  key="user_chat_input_box",
@@ -442,25 +426,39 @@ def run_streamlit_app():
442
  )
443
 
444
  if user_chat_input:
445
- current_messages = st.session_state.get('chat_messages', [])
446
- current_messages.append({"role": "user", "content": user_chat_input})
447
- st.session_state.chat_messages = current_messages
448
 
 
449
  loop_instance = st.session_state.get('gemini_loop_instance')
450
  if loop_instance and loop_instance.async_event_loop and loop_instance.gemini_session:
451
  if loop_instance.async_event_loop.is_running():
452
- asyncio.run_coroutine_threadsafe(
 
453
  loop_instance.send_text_input_to_gemini(user_chat_input),
454
  loop_instance.async_event_loop
455
  )
 
 
 
 
 
 
 
456
  else: st.error("Session event loop is not running. Cannot send message.")
457
  elif not loop_instance or not st.session_state.gemini_session_active:
458
  st.error("Session is not active. Please start a session to send messages.")
459
  else: st.warning("Session components not fully ready. Please wait a moment.")
 
 
460
  st.rerun()
461
 
462
  if __name__ == "__main__":
 
463
  if client is None:
 
464
  logging.critical("Gemini client could not be initialized. Application cannot start.")
 
 
465
  else:
466
  run_streamlit_app()
 
6
  import io
7
  import threading
8
  import traceback
9
+ import atexit # Correctly imported
10
  import time
11
  import logging
12
  from dotenv import load_dotenv
 
25
  AudioProcessorBase,
26
  VideoProcessorBase,
27
  )
28
+ # from aiortc import RTCIceServer, RTCConfiguration # Not needed directly
29
 
30
  # --- Configuration ---
31
  load_dotenv()
 
45
  VIDEO_FPS_TO_GEMINI = 2
46
  VIDEO_API_RESIZE = (1024, 1024)
47
 
48
+ # !!! IMPORTANT: Verify this model name is correct for the Live API !!!
49
  MODEL_NAME = "models/gemini-2.0-flash-live-001"
50
 
51
  MEDICAL_ASSISTANT_SYSTEM_PROMPT = """You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
 
92
  logging.critical("GEMINI_API_KEY not found.")
93
  st.stop()
94
 
95
+ # Gemini LiveConnectConfig - Simplified for debugging ConnectionClosedError
96
+ # Removed speech_config temporarily. If connection works now, the issue was voice config.
97
+ # If it still fails, check MODEL_NAME and API Key permissions.
98
  LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
99
  response_modalities=["audio", "text"],
100
+ # speech_config=types.SpeechConfig( # Temporarily commented out for debugging
101
+ # voice_config=types.VoiceConfig(prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr"))
102
+ # ),
103
  )
104
+ logging.info(f"Using LiveConnectConfig: {LIVE_CONNECT_CONFIG}") # Log the config being used
105
 
106
  # --- Backend Gemini Interaction Loop ---
107
  class GeminiInteractionLoop:
 
110
  self.async_event_loop = None
111
  self.is_running = True
112
  self.playback_stream = None
 
 
113
 
114
  async def send_text_input_to_gemini(self, user_text):
115
  if not user_text or not self.gemini_session or not self.is_running:
 
124
  async def stream_media_to_gemini(self):
125
  logging.info("Task started: Stream media from WebRTC queues to Gemini.")
126
  async def get_media_from_queues():
 
127
  if video_frames_to_gemini_q is None or audio_chunks_to_gemini_q is None:
128
+ await asyncio.sleep(0.1); return None
 
129
  try:
130
  video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
131
+ video_frames_to_gemini_q.task_done(); return video_frame
 
132
  except asyncio.TimeoutError: pass
133
  except Exception as e: logging.error(f"Error getting video from queue: {e}", exc_info=True)
 
134
  try:
135
  audio_chunk = await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.02)
136
+ audio_chunks_to_gemini_q.task_done(); return audio_chunk
 
137
  except asyncio.TimeoutError: return None
138
+ except Exception as e: logging.error(f"Error getting audio from queue: {e}", exc_info=True); return None
 
 
139
  try:
140
  while self.is_running:
141
+ if not self.gemini_session: await asyncio.sleep(0.1); continue
 
142
  media_data = await get_media_from_queues()
143
  if media_data and self.gemini_session and self.is_running:
144
+ try: await self.gemini_session.send(input=media_data)
 
145
  except Exception as e: logging.error(f"Error sending media chunk to Gemini: {e}", exc_info=True)
146
  elif not media_data: await asyncio.sleep(0.05)
147
  except asyncio.CancelledError: logging.info("Task cancelled: stream_media_to_gemini.")
 
151
  logging.info("Task started: Process responses from Gemini.")
152
  try:
153
  while self.is_running:
154
+ if not self.gemini_session: await asyncio.sleep(0.1); continue
155
+ if audio_from_gemini_playback_q is None: await asyncio.sleep(0.1); continue
 
 
156
  try:
157
  turn_response = self.gemini_session.receive()
158
  async for chunk in turn_response:
159
  if not self.is_running: break
160
  if audio_data := chunk.data:
161
+ if not audio_from_gemini_playback_q.full(): audio_from_gemini_playback_q.put_nowait(audio_data)
 
162
  else: logging.warning("Audio playback queue full, discarding Gemini audio data.")
163
  if text_response := chunk.text:
164
  logging.info(f"Gemini text response: {text_response[:100]}")
165
  if 'chat_messages' not in st.session_state: st.session_state.chat_messages = []
166
+ # This direct update might cause issues if multiple updates happen before rerun
167
  st.session_state.chat_messages = st.session_state.chat_messages + [{"role": "assistant", "content": text_response}]
168
  except types.generation_types.StopCandidateException: logging.info("Gemini response stream ended normally.")
169
  except Exception as e:
 
174
 
175
  async def play_gemini_audio(self):
176
  logging.info("Task started: Play Gemini audio responses.")
177
+ # Note: This task might fail in environments without proper ALSA setup (like HF Spaces default)
178
+ # Error handling below will log the error but allow other tasks to continue.
179
  try:
180
+ while audio_from_gemini_playback_q is None and self.is_running: await asyncio.sleep(0.1)
 
 
181
  if not self.is_running: return
182
 
183
  self.playback_stream = await asyncio.to_thread(
 
187
  while self.is_running:
188
  try:
189
  audio_chunk = await asyncio.wait_for(audio_from_gemini_playback_q.get(), timeout=1.0)
190
+ if audio_chunk: await asyncio.to_thread(self.playback_stream.write, audio_chunk)
 
191
  if audio_chunk: audio_from_gemini_playback_q.task_done()
192
  except asyncio.TimeoutError: continue
193
+ except Exception as e: logging.error(f"Error playing audio chunk: {e}", exc_info=True); await asyncio.sleep(0.01)
194
  except Exception as e:
195
+ logging.error(f"Failed to open or use PyAudio playback stream: {e}", exc_info=True)
196
+ # Don't set self.is_running = False here, just log the playback failure
197
  finally:
198
  if self.playback_stream:
199
  logging.info("Stopping and closing PyAudio playback stream.")
200
+ try:
201
+ await asyncio.to_thread(self.playback_stream.stop_stream)
202
+ await asyncio.to_thread(self.playback_stream.close)
203
+ except Exception as e_close:
204
+ logging.error(f"Error closing playback stream: {e_close}", exc_info=True)
205
  self.playback_stream = None
206
  logging.info("Task finished: play_gemini_audio.")
207
 
208
  def signal_stop(self):
209
  logging.info("Signal to stop GeminiInteractionLoop received.")
210
  self.is_running = False
 
211
  for q in [video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q]:
212
+ if q:
213
  try: q.put_nowait(None)
214
  except asyncio.QueueFull: logging.warning(f"Queue was full when trying to put sentinel for stop signal.")
215
  except Exception as e: logging.error(f"Error putting sentinel in queue: {e}", exc_info=True)
216
 
217
  async def run_main_loop(self):
218
+ global video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q
219
 
220
  self.async_event_loop = asyncio.get_running_loop()
221
  self.is_running = True
222
  logging.info("GeminiInteractionLoop run_main_loop starting...")
223
 
 
224
  video_frames_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
225
  audio_chunks_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
226
  audio_from_gemini_playback_q = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
227
  logging.info("Asyncio queues initialized in GeminiInteractionLoop.")
228
 
229
+ if client is None: logging.critical("Gemini client is None in run_main_loop. Aborting."); return
 
 
230
 
231
+ session_task_group = None # Define before try block
232
  try:
233
  async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
234
  self.gemini_session = session
 
242
  self.is_running = False; return
243
 
244
  async with asyncio.TaskGroup() as tg:
245
+ session_task_group = tg # Assign task group
246
  logging.info("Creating async tasks for Gemini interaction...")
247
  tg.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
248
  tg.create_task(self.process_gemini_responses(), name="process_gemini_responses")
249
  tg.create_task(self.play_gemini_audio(), name="play_gemini_audio")
250
  logging.info("All Gemini interaction tasks created in TaskGroup.")
251
  logging.info("Gemini TaskGroup finished execution.")
252
+
253
  except asyncio.CancelledError: logging.info("GeminiInteractionLoop.run_main_loop() was cancelled.")
254
+ # Removed ExceptionGroup handling as it's not available in Python 3.9
 
 
 
255
  except Exception as e:
256
+ # Log general exceptions, including the ConnectionClosedError if it happens here
257
+ logging.error(f"Exception in GeminiInteractionLoop run_main_loop: {type(e).__name__}: {e}", exc_info=True)
258
  finally:
259
  logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
260
+ self.is_running = False # Ensure flag is set
261
+ # Cancel any potentially lingering tasks if TaskGroup didn't exit cleanly (though it should)
262
+ if session_task_group and not session_task_group._closed:
263
+ logging.warning("TaskGroup did not close cleanly, attempting cancellation.")
264
+ try:
265
+ session_task_group.cancel() # Attempt cancellation if needed
266
+ except Exception as e_cancel:
267
+ logging.error(f"Error cancelling task group: {e_cancel}")
268
+
269
+ self.gemini_session = None # Session closed by async with
270
+ # Clear global queues
271
  video_frames_to_gemini_q = None
272
  audio_chunks_to_gemini_q = None
273
  audio_from_gemini_playback_q = None
 
280
  self.last_gemini_send_time = time.monotonic()
281
 
282
  async def _process_and_queue_frame_async(self, frame_ndarray):
283
+ if video_frames_to_gemini_q is None: return
 
 
 
284
  self.frame_counter += 1
285
  current_time = time.monotonic()
286
+ if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI): return
 
 
287
  self.last_gemini_send_time = current_time
288
  try:
289
  img_rgb = cv2.cvtColor(frame_ndarray, cv2.COLOR_BGR2RGB)
 
293
  pil_img.save(image_io, format="jpeg")
294
  image_bytes = image_io.getvalue()
295
  api_data = {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
 
296
  if video_frames_to_gemini_q.full():
297
  try: await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.01)
298
  except asyncio.TimeoutError: logging.warning("Video queue full, frame dropped."); return
 
301
 
302
  async def recv(self, frame):
303
  img_bgr = frame.to_ndarray(format="bgr24")
 
304
  try:
305
  loop = asyncio.get_running_loop()
306
  loop.create_task(self._process_and_queue_frame_async(img_bgr))
307
+ except RuntimeError: logging.error("VideoProcessor.recv: No running asyncio loop in current thread for create_task.")
 
 
 
308
  return frame
309
 
310
  class AudioProcessor(AudioProcessorBase):
311
  async def _process_and_queue_audio_async(self, audio_frames):
312
+ if audio_chunks_to_gemini_q is None: return
 
 
 
313
  for frame in audio_frames:
314
  audio_data = frame.planes[0].to_bytes()
315
  mime_type = f"audio/L16;rate={frame.sample_rate};channels={frame.layout.channels}"
316
  api_data = {"data": audio_data, "mime_type": mime_type}
 
317
  try:
318
  if audio_chunks_to_gemini_q.full():
319
  try: await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.01)
 
325
  try:
326
  loop = asyncio.get_running_loop()
327
  loop.create_task(self._process_and_queue_audio_async(frames))
328
+ except RuntimeError: logging.error("AudioProcessor.recv: No running asyncio loop in current thread for create_task.")
 
 
329
  return frames
330
 
331
  # --- Streamlit UI and Application Logic ---
 
346
 
347
  st.title("Live AI Medical Assistant")
348
  st.markdown("Utilizing Gemini Live API via WebRTC on Hugging Face Spaces")
349
+ st.info("Remember: This AI cannot provide medical diagnoses. Always consult a healthcare professional for medical advice.")
350
+
351
 
352
  with st.sidebar:
353
  st.header("Session Control")
 
387
  }
388
  }
389
 
 
 
 
 
 
390
  webrtc_ctx = webrtc_streamer(
391
  key=st.session_state.webrtc_component_key,
392
  mode=WebRtcMode.SENDONLY,
 
394
  "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
395
  },
396
  media_stream_constraints=MEDIA_STREAM_CONSTRAINTS,
397
+ video_processor_factory=VideoProcessor,
398
+ audio_processor_factory=AudioProcessor,
399
  async_processing=True,
400
  )
401
 
 
409
  st.info("Click 'Start Session' in the sidebar to enable the live feed and assistant.")
410
 
411
  st.subheader("Chat with Assistant")
412
+ # Use a container with a fixed height for scrollable chat
413
+ chat_container = st.container()
414
+ with chat_container:
415
+ # Display messages from session state
416
+ messages = st.session_state.get('chat_messages', [])
417
+ for msg in messages:
418
  with st.chat_message(msg["role"]):
419
  st.write(msg["content"])
420
+
421
+ # Chat input outside the container
422
  user_chat_input = st.chat_input(
423
  "Type your message...",
424
  key="user_chat_input_box",
 
426
  )
427
 
428
  if user_chat_input:
429
+ # Append user message immediately for responsiveness
430
+ st.session_state.chat_messages = st.session_state.get('chat_messages', []) + [{"role": "user", "content": user_chat_input}]
 
431
 
432
+ # Send to backend
433
  loop_instance = st.session_state.get('gemini_loop_instance')
434
  if loop_instance and loop_instance.async_event_loop and loop_instance.gemini_session:
435
  if loop_instance.async_event_loop.is_running():
436
+ # Use run_coroutine_threadsafe to call async func from Streamlit thread
437
+ future = asyncio.run_coroutine_threadsafe(
438
  loop_instance.send_text_input_to_gemini(user_chat_input),
439
  loop_instance.async_event_loop
440
  )
441
+ try:
442
+ future.result(timeout=2) # Optional: wait briefly for confirmation
443
+ except TimeoutError:
444
+ logging.warning("Timed out waiting for send_text_input_to_gemini confirmation.")
445
+ except Exception as e:
446
+ logging.error(f"Error calling send_text_input_to_gemini: {e}", exc_info=True)
447
+
448
  else: st.error("Session event loop is not running. Cannot send message.")
449
  elif not loop_instance or not st.session_state.gemini_session_active:
450
  st.error("Session is not active. Please start a session to send messages.")
451
  else: st.warning("Session components not fully ready. Please wait a moment.")
452
+
453
+ # Rerun to display the user message and potentially any quick text response from AI
454
  st.rerun()
455
 
456
  if __name__ == "__main__":
457
+ # Final check before running the app
458
  if client is None:
459
+ # Log critical error if client is still None (should have been caught earlier)
460
  logging.critical("Gemini client could not be initialized. Application cannot start.")
461
+ # Display error in Streamlit if possible (might not render if client init failed early)
462
+ st.error("CRITICAL ERROR: Gemini client failed to initialize. Check API Key and logs.")
463
  else:
464
  run_streamlit_app()