noumanjavaid commited on
Commit
dee6e00
·
verified ·
1 Parent(s): ff707de

Update src/streamlit_app.py

Browse files
Files changed (1) hide show
  1. src/streamlit_app.py +205 -98
src/streamlit_app.py CHANGED
@@ -11,41 +11,51 @@ import time
11
  import logging
12
  from dotenv import load_dotenv
13
 
14
- import cv2 # For image processing
15
- import pyaudio # For audio PLAYBACK
16
  import PIL.Image
17
 
18
  from google import genai
19
  from google.genai import types
20
 
21
- # streamlit-webrtc components
22
  from streamlit_webrtc import (
23
  webrtc_streamer,
24
  WebRtcMode,
25
  AudioProcessorBase,
26
  VideoProcessorBase,
27
  )
28
- # from aiortc import RTCIceServer, RTCConfiguration # Not needed directly
29
 
30
- # --- Configuration ---
31
  load_dotenv()
32
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
33
- logging.info("Application starting up...")
34
 
35
  # Audio configuration
36
- PYAUDIO_FORMAT = pyaudio.paInt16
37
- PYAUDIO_CHANNELS = 1
38
- WEBRTC_REQUESTED_AUDIO_CHANNELS = 1
39
- WEBRTC_REQUESTED_SEND_SAMPLE_RATE = 16000
40
- GEMINI_AUDIO_RECEIVE_SAMPLE_RATE = 24000
41
- PYAUDIO_PLAYBACK_CHUNK_SIZE = 1024
42
- AUDIO_PLAYBACK_QUEUE_MAXSIZE = 50
43
- MEDIA_TO_GEMINI_QUEUE_MAXSIZE = 30
 
 
 
 
44
 
45
  # Video configuration
 
46
  VIDEO_FPS_TO_GEMINI = 2
47
  VIDEO_API_RESIZE = (1024, 1024)
48
 
 
 
 
 
 
 
 
 
 
49
  # !!! IMPORTANT: Verify this model name is correct for the Live API !!!
50
  MODEL_NAME = "models/gemini-2.0-flash-live-001"
51
  logging.info(f"Using Gemini Model: {MODEL_NAME}")
@@ -68,13 +78,16 @@ Example of a disclaimer you might use: "As an AI assistant, I can describe what
68
  pya = None
69
  try:
70
  pya = pyaudio.PyAudio()
 
71
  def cleanup_pyaudio():
72
  logging.info("Terminating PyAudio instance.")
73
- if pya: pya.terminate()
 
74
  atexit.register(cleanup_pyaudio)
75
  logging.info("PyAudio initialized successfully.")
76
  except Exception as e_pyaudio:
77
- logging.warning(f"PyAudio initialization failed (expected in some server environments): {e_pyaudio}")
 
78
  pya = None
79
 
80
  # --- Global Queues - Declare as None, initialize later ---
@@ -94,17 +107,19 @@ if GEMINI_API_KEY:
94
  logging.critical(f"Gemini client initialization failed: {e}", exc_info=True)
95
  st.stop()
96
  else:
97
- st.error("GEMINI_API_KEY not found in environment variables. Please set it for the application to run.")
 
98
  logging.critical("GEMINI_API_KEY not found.")
99
  st.stop()
100
 
101
  LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
102
- response_modalities=["audio", "text", "video"], # Requesting audio response
103
  speech_config=types.SpeechConfig(
104
  voice_config=types.VoiceConfig(
105
- prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr") # Using Puck voice
 
106
  )
107
- ) # <---------------------------------- CORRECTED: Added missing closing parenthesis
108
  )
109
  logging.info(f"Attempting connection with LiveConnectConfig: {LIVE_CONNECT_CONFIG}")
110
 
@@ -119,7 +134,8 @@ class GeminiInteractionLoop:
119
 
120
  async def send_text_input_to_gemini(self, user_text):
121
  if not user_text or not self.gemini_session or not self.is_running:
122
- logging.warning("Cannot send text. Session not active, no text, or not running.")
 
123
  return
124
  try:
125
  logging.info(f"Sending text to Gemini: '{user_text[:50]}...'")
@@ -127,96 +143,144 @@ class GeminiInteractionLoop:
127
  # For now, keeping session.send as it was working functionally
128
  await self.gemini_session.send(input=user_text, end_of_turn=True)
129
  except Exception as e:
130
- logging.error(f"Error sending text message to Gemini: {e}", exc_info=True)
 
131
 
132
  async def stream_media_to_gemini(self):
133
  logging.info("Task started: Stream media from WebRTC queues to Gemini.")
 
134
  async def get_media_from_queues():
135
  if video_frames_to_gemini_q is None or audio_chunks_to_gemini_q is None:
136
- await asyncio.sleep(0.1); return None
 
137
  try:
138
  video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
139
- if video_frame is None: return None # Sentinel received
140
- video_frames_to_gemini_q.task_done(); return video_frame
141
- except asyncio.TimeoutError: pass
142
- except Exception as e: logging.error(f"Error getting video from queue: {e}", exc_info=True)
 
 
 
 
143
  try:
144
  audio_chunk = await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.02)
145
- if audio_chunk is None: return None # Sentinel received
146
- audio_chunks_to_gemini_q.task_done(); return audio_chunk
147
- except asyncio.TimeoutError: return None
148
- except Exception as e: logging.error(f"Error getting audio from queue: {e}", exc_info=True); return None
 
 
 
 
 
149
 
150
  try:
151
  while self.is_running:
152
- if not self.gemini_session: await asyncio.sleep(0.1); continue
 
 
153
  media_data = await get_media_from_queues()
154
- if media_data is None and not self.is_running: break # Sentinel and stop signal
 
155
  if media_data and self.gemini_session and self.is_running:
156
  try:
157
  # Use the specific method as suggested by the deprecation warning if possible
158
  # For now, keeping session.send as it was working functionally
159
  await self.gemini_session.send(input=media_data)
160
- except Exception as e: logging.error(f"Error sending media chunk to Gemini: {e}", exc_info=True)
161
- elif not media_data: await asyncio.sleep(0.05) # No data, yield
162
- except asyncio.CancelledError: logging.info("Task cancelled: stream_media_to_gemini.")
163
- finally: logging.info("Task finished: stream_media_to_gemini.")
 
 
 
 
 
164
 
165
  async def process_gemini_responses(self):
166
  logging.info("Task started: Process responses from Gemini.")
167
  try:
168
  while self.is_running:
169
- if not self.gemini_session: await asyncio.sleep(0.1); continue
170
- if audio_from_gemini_playback_q is None: await asyncio.sleep(0.1); continue
 
 
 
 
171
  try:
172
  turn_response = self.gemini_session.receive()
173
  async for chunk in turn_response:
174
- if not self.is_running: break
 
175
  if audio_data := chunk.data:
176
- if not audio_from_gemini_playback_q.full(): audio_from_gemini_playback_q.put_nowait(audio_data)
177
- else: logging.warning("Audio playback queue full, discarding Gemini audio data.")
 
 
 
178
  if text_response := chunk.text:
179
  logging.info(f"Gemini text response: {text_response[:100]}")
180
- if 'chat_messages' not in st.session_state: st.session_state.chat_messages = []
181
- st.session_state.chat_messages = st.session_state.chat_messages + [{"role": "assistant", "content": text_response}]
182
- except types.generation_types.StopCandidateException: logging.info("Gemini response stream ended normally.")
 
 
 
183
  except Exception as e:
184
- if self.is_running: logging.error(f"Error receiving from Gemini: {e}", exc_info=True)
 
 
185
  await asyncio.sleep(0.1)
186
- except asyncio.CancelledError: logging.info("Task cancelled: process_gemini_responses.")
187
- finally: logging.info("Task finished: process_gemini_responses.")
 
 
188
 
189
  async def play_gemini_audio(self):
190
  logging.info("Task started: Play Gemini audio responses.")
191
  if pya is None:
192
- logging.warning("PyAudio not available. Audio playback task will not run.")
193
- return
 
194
 
195
  try:
196
- while audio_from_gemini_playback_q is None and self.is_running: await asyncio.sleep(0.1)
197
- if not self.is_running: return
 
 
198
 
199
  self.playback_stream = await asyncio.to_thread(
200
  pya.open, format=PYAUDIO_FORMAT, channels=PYAUDIO_CHANNELS, rate=GEMINI_AUDIO_RECEIVE_SAMPLE_RATE, output=True, frames_per_buffer=PYAUDIO_PLAYBACK_CHUNK_SIZE
201
  )
202
- logging.info(f"PyAudio playback stream opened at {GEMINI_AUDIO_RECEIVE_SAMPLE_RATE} Hz.")
 
203
  while self.is_running:
204
  try:
205
  audio_chunk = await asyncio.wait_for(audio_from_gemini_playback_q.get(), timeout=1.0)
206
- if audio_chunk is None and not self.is_running: break # Sentinel and stop signal
207
- if audio_chunk: await asyncio.to_thread(self.playback_stream.write, audio_chunk)
208
- if audio_chunk: audio_from_gemini_playback_q.task_done()
209
- except asyncio.TimeoutError: continue
210
- except Exception as e: logging.error(f"Error playing audio chunk: {e}", exc_info=True); await asyncio.sleep(0.01)
 
 
 
 
 
 
211
  except Exception as e:
212
- logging.error(f"Failed to open or use PyAudio playback stream (might be expected in this environment): {e}", exc_info=True)
 
213
  finally:
214
  if self.playback_stream:
215
  logging.info("Stopping and closing PyAudio playback stream.")
216
  try:
217
  await asyncio.to_thread(self.playback_stream.stop_stream)
218
  await asyncio.to_thread(self.playback_stream.close)
219
- except Exception as e_close: logging.error(f"Error closing playback stream: {e_close}", exc_info=True)
 
 
220
  self.playback_stream = None
221
  logging.info("Task finished: play_gemini_audio.")
222
 
@@ -227,9 +291,14 @@ class GeminiInteractionLoop:
227
  ("audio_in_q", audio_chunks_to_gemini_q),
228
  ("audio_out_q", audio_from_gemini_playback_q)]:
229
  if q_obj_ref:
230
- try: q_obj_ref.put_nowait(None)
231
- except asyncio.QueueFull: logging.warning(f"Queue {q_name} was full when trying to put sentinel for stop signal.")
232
- except Exception as e: logging.error(f"Error putting sentinel in {q_name}: {e}", exc_info=True)
 
 
 
 
 
233
 
234
  async def run_main_loop(self):
235
  global video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q
@@ -238,59 +307,77 @@ class GeminiInteractionLoop:
238
  self.is_running = True
239
  logging.info("GeminiInteractionLoop run_main_loop starting...")
240
 
241
- video_frames_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
242
- audio_chunks_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
243
- audio_from_gemini_playback_q = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
 
 
 
244
  logging.info("Asyncio queues initialized in GeminiInteractionLoop.")
245
 
246
- if client is None: logging.critical("Gemini client is None in run_main_loop. Aborting."); return
 
 
 
247
 
248
  try:
249
  async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
250
  self.gemini_session = session
251
- logging.info(f"Gemini session established with API for model {MODEL_NAME}.")
 
252
  try:
253
  logging.info("Sending system prompt to Gemini...")
254
  await self.gemini_session.send(input=MEDICAL_ASSISTANT_SYSTEM_PROMPT, end_of_turn=False)
255
  logging.info("System prompt sent successfully.")
256
  except Exception as e:
257
- logging.error(f"Failed to send system prompt: {e}", exc_info=True)
258
- self.is_running = False; return
 
 
259
 
260
  # Using asyncio.gather for Python 3.9 compatibility
261
  tasks = []
262
  try:
263
  logging.info("Creating async tasks for Gemini interaction...")
264
- tasks.append(asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini"))
265
- tasks.append(asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses"))
266
- tasks.append(asyncio.create_task(self.play_gemini_audio(), name="play_gemini_audio"))
 
 
 
267
  logging.info("All Gemini interaction tasks created.")
268
  # Wait for tasks to complete or raise an exception
269
  done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
270
  # Check results of completed tasks for errors
271
  for future in done:
272
  try:
273
- future.result() # Raise exception if task failed
274
  except Exception as task_exc:
275
- logging.error(f"Task {future.get_name()} failed: {task_exc}", exc_info=True)
 
276
  # Optionally cancel remaining tasks if one fails critically
277
- for p_task in pending: p_task.cancel()
 
278
  # If loop completes normally (e.g., user stops), pending tasks will be handled by finally block
279
- except Exception as e_gather: # Catch errors during task creation/gathering
280
- logging.error(f"Error during task management: {e_gather}", exc_info=True)
 
281
  for task in tasks:
282
- if not task.done(): task.cancel()
 
283
  # Wait for cancellations to complete
284
  await asyncio.gather(*tasks, return_exceptions=True)
285
  logging.info("Gemini interaction tasks finished or cancelled.")
286
 
287
- except asyncio.CancelledError: logging.info("GeminiInteractionLoop.run_main_loop() was cancelled.")
288
- except Exception as e: # General catch-all, including ConnectionClosedError
289
- logging.error(f"Exception in GeminiInteractionLoop run_main_loop: {type(e).__name__}: {e}", exc_info=True)
 
 
290
  finally:
291
  logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
292
  self.is_running = False
293
- self.signal_stop() # Ensure sentinels are sent
294
  # Clean up any remaining tasks (important if gather didn't complete)
295
  # current_tasks = [t for t in asyncio.all_tasks(self.async_event_loop) if t is not asyncio.current_task()]
296
  # if current_tasks:
@@ -302,7 +389,8 @@ class GeminiInteractionLoop:
302
  video_frames_to_gemini_q = None
303
  audio_chunks_to_gemini_q = None
304
  audio_from_gemini_playback_q = None
305
- logging.info("GeminiInteractionLoop finished and global queues set to None.")
 
306
 
307
 
308
  # --- WebRTC Media Processors ---
@@ -312,10 +400,12 @@ class VideoProcessor(VideoProcessorBase):
312
  self.last_gemini_send_time = time.monotonic()
313
 
314
  async def _process_and_queue_frame_async(self, frame_ndarray):
315
- if video_frames_to_gemini_q is None: return
 
316
  self.frame_counter += 1
317
  current_time = time.monotonic()
318
- if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI): return
 
319
  self.last_gemini_send_time = current_time
320
  try:
321
  img_rgb = cv2.cvtColor(frame_ndarray, cv2.COLOR_BGR2RGB)
@@ -324,40 +414,57 @@ class VideoProcessor(VideoProcessorBase):
324
  image_io = io.BytesIO()
325
  pil_img.save(image_io, format="jpeg")
326
  image_bytes = image_io.getvalue()
327
- api_data = {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
 
328
  if video_frames_to_gemini_q.full():
329
- try: await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.01)
330
- except asyncio.TimeoutError: logging.warning("Video queue full, frame dropped."); return
 
 
 
331
  video_frames_to_gemini_q.put_nowait(api_data)
332
- except Exception as e: logging.error(f"Error processing/queueing video frame: {e}", exc_info=True)
 
 
333
 
334
  async def recv(self, frame):
335
  img_bgr = frame.to_ndarray(format="bgr24")
336
  try:
337
  loop = asyncio.get_running_loop()
338
  loop.create_task(self._process_and_queue_frame_async(img_bgr))
339
- except RuntimeError: logging.error("VideoProcessor.recv: No running asyncio loop in current thread for create_task.")
 
 
340
  return frame
341
 
 
342
  class AudioProcessor(AudioProcessorBase):
343
  async def _process_and_queue_audio_async(self, audio_frames):
344
- if audio_chunks_to_gemini_q is None: return
 
345
  for frame in audio_frames:
346
  audio_data = frame.planes[0].to_bytes()
347
  mime_type = f"audio/L16;rate={frame.sample_rate};channels={frame.layout.channels}"
348
  api_data = {"data": audio_data, "mime_type": mime_type}
349
  try:
350
  if audio_chunks_to_gemini_q.full():
351
- try: await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.01)
352
- except asyncio.TimeoutError: logging.warning("Audio queue full, chunk dropped."); continue
 
 
 
353
  audio_chunks_to_gemini_q.put_nowait(api_data)
354
- except Exception as e: logging.error(f"Error queueing audio chunk: {e}", exc_info=True)
355
-
 
 
356
  async def recv(self, frames):
357
  try:
358
  loop = asyncio.get_running_loop()
359
  loop.create_task(self._process_and_queue_audio_async(frames))
360
- except RuntimeError: logging.error("AudioProcessor.recv: No running asyncio loop in current thread for create_task.")
 
 
361
  return frames
362
 
363
  # --- Streamlit UI and Application Logic ---
 
11
  import logging
12
  from dotenv import load_dotenv
13
 
14
+ import cv2
15
+ import pyaudio
16
  import PIL.Image
17
 
18
  from google import genai
19
  from google.genai import types
20
 
 
21
  from streamlit_webrtc import (
22
  webrtc_streamer,
23
  WebRtcMode,
24
  AudioProcessorBase,
25
  VideoProcessorBase,
26
  )
 
27
 
 
28
  load_dotenv()
29
+
 
30
 
31
  # Audio configuration
32
+
33
+ FORMAT = pyaudio.paInt16
34
+ CHANNELS = 1
35
+ SEND_SAMPLE_RATE = 16000
36
+ RECEIVE_SAMPLE_RATE = 24000
37
+ CHUNK_SIZE = 1024
38
+
39
+ # Map PyAudio format to a more descriptive name for clarity.
40
+ PYAUDIO_FORMAT = FORMAT # pyaudio.paInt16
41
+ PYAUDIO_CHANNELS = CHANNELS
42
+ PYAUDIO_PLAYBACK_CHUNK_SIZE = CHUNK_SIZE
43
+ GEMINI_AUDIO_RECEIVE_SAMPLE_RATE = RECEIVE_SAMPLE_RATE
44
 
45
  # Video configuration
46
+
47
  VIDEO_FPS_TO_GEMINI = 2
48
  VIDEO_API_RESIZE = (1024, 1024)
49
 
50
+ # Queue sizes
51
+ MEDIA_TO_GEMINI_QUEUE_MAXSIZE = 10
52
+ AUDIO_PLAYBACK_QUEUE_MAXSIZE = 10
53
+
54
+ # WebRTC settings
55
+ WEBRTC_REQUESTED_SEND_SAMPLE_RATE = SEND_SAMPLE_RATE
56
+ WEBRTC_REQUESTED_AUDIO_CHANNELS = CHANNELS
57
+
58
+
59
  # !!! IMPORTANT: Verify this model name is correct for the Live API !!!
60
  MODEL_NAME = "models/gemini-2.0-flash-live-001"
61
  logging.info(f"Using Gemini Model: {MODEL_NAME}")
 
78
  pya = None
79
  try:
80
  pya = pyaudio.PyAudio()
81
+
82
  def cleanup_pyaudio():
83
  logging.info("Terminating PyAudio instance.")
84
+ if pya:
85
+ pya.terminate()
86
  atexit.register(cleanup_pyaudio)
87
  logging.info("PyAudio initialized successfully.")
88
  except Exception as e_pyaudio:
89
+ logging.warning(
90
+ f"PyAudio initialization failed (expected in some server environments): {e_pyaudio}")
91
  pya = None
92
 
93
  # --- Global Queues - Declare as None, initialize later ---
 
107
  logging.critical(f"Gemini client initialization failed: {e}", exc_info=True)
108
  st.stop()
109
  else:
110
+ st.error(
111
+ "GEMINI_API_KEY not found in environment variables. Please set it for the application to run.")
112
  logging.critical("GEMINI_API_KEY not found.")
113
  st.stop()
114
 
115
  LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
116
+ response_modalities=["audio", "text", "video"], # Requesting audio response
117
  speech_config=types.SpeechConfig(
118
  voice_config=types.VoiceConfig(
119
+ # Using Puck voice
120
+ prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
121
  )
122
+ ) # <---------------------------------- CORRECTED: Added missing closing parenthesis
123
  )
124
  logging.info(f"Attempting connection with LiveConnectConfig: {LIVE_CONNECT_CONFIG}")
125
 
 
134
 
135
  async def send_text_input_to_gemini(self, user_text):
136
  if not user_text or not self.gemini_session or not self.is_running:
137
+ logging.warning(
138
+ "Cannot send text. Session not active, no text, or not running.")
139
  return
140
  try:
141
  logging.info(f"Sending text to Gemini: '{user_text[:50]}...'")
 
143
  # For now, keeping session.send as it was working functionally
144
  await self.gemini_session.send(input=user_text, end_of_turn=True)
145
  except Exception as e:
146
+ logging.error(
147
+ f"Error sending text message to Gemini: {e}", exc_info=True)
148
 
149
  async def stream_media_to_gemini(self):
150
  logging.info("Task started: Stream media from WebRTC queues to Gemini.")
151
+
152
  async def get_media_from_queues():
153
  if video_frames_to_gemini_q is None or audio_chunks_to_gemini_q is None:
154
+ await asyncio.sleep(0.1)
155
+ return None
156
  try:
157
  video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
158
+ if video_frame is None:
159
+ return None # Sentinel received
160
+ video_frames_to_gemini_q.task_done()
161
+ return video_frame
162
+ except asyncio.TimeoutError:
163
+ pass
164
+ except Exception as e:
165
+ logging.error(f"Error getting video from queue: {e}", exc_info=True)
166
  try:
167
  audio_chunk = await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.02)
168
+ if audio_chunk is None:
169
+ return None # Sentinel received
170
+ audio_chunks_to_gemini_q.task_done()
171
+ return audio_chunk
172
+ except asyncio.TimeoutError:
173
+ return None
174
+ except Exception as e:
175
+ logging.error(f"Error getting audio from queue: {e}", exc_info=True)
176
+ return None
177
 
178
  try:
179
  while self.is_running:
180
+ if not self.gemini_session:
181
+ await asyncio.sleep(0.1)
182
+ continue
183
  media_data = await get_media_from_queues()
184
+ if media_data is None and not self.is_running:
185
+ break # Sentinel and stop signal
186
  if media_data and self.gemini_session and self.is_running:
187
  try:
188
  # Use the specific method as suggested by the deprecation warning if possible
189
  # For now, keeping session.send as it was working functionally
190
  await self.gemini_session.send(input=media_data)
191
+ except Exception as e:
192
+ logging.error(
193
+ f"Error sending media chunk to Gemini: {e}", exc_info=True)
194
+ elif not media_data:
195
+ await asyncio.sleep(0.05) # No data, yield
196
+ except asyncio.CancelledError:
197
+ logging.info("Task cancelled: stream_media_to_gemini.")
198
+ finally:
199
+ logging.info("Task finished: stream_media_to_gemini.")
200
 
201
  async def process_gemini_responses(self):
202
  logging.info("Task started: Process responses from Gemini.")
203
  try:
204
  while self.is_running:
205
+ if not self.gemini_session:
206
+ await asyncio.sleep(0.1)
207
+ continue
208
+ if audio_from_gemini_playback_q is None:
209
+ await asyncio.sleep(0.1)
210
+ continue
211
  try:
212
  turn_response = self.gemini_session.receive()
213
  async for chunk in turn_response:
214
+ if not self.is_running:
215
+ break
216
  if audio_data := chunk.data:
217
+ if not audio_from_gemini_playback_q.full():
218
+ audio_from_gemini_playback_q.put_nowait(audio_data)
219
+ else:
220
+ logging.warning(
221
+ "Audio playback queue full, discarding Gemini audio data.")
222
  if text_response := chunk.text:
223
  logging.info(f"Gemini text response: {text_response[:100]}")
224
+ if 'chat_messages' not in st.session_state:
225
+ st.session_state.chat_messages = []
226
+ st.session_state.chat_messages = st.session_state.chat_messages + [
227
+ {"role": "assistant", "content": text_response}]
228
+ except types.generation_types.StopCandidateException:
229
+ logging.info("Gemini response stream ended normally.")
230
  except Exception as e:
231
+ if self.is_running:
232
+ logging.error(
233
+ f"Error receiving from Gemini: {e}", exc_info=True)
234
  await asyncio.sleep(0.1)
235
+ except asyncio.CancelledError:
236
+ logging.info("Task cancelled: process_gemini_responses.")
237
+ finally:
238
+ logging.info("Task finished: process_gemini_responses.")
239
 
240
  async def play_gemini_audio(self):
241
  logging.info("Task started: Play Gemini audio responses.")
242
  if pya is None:
243
+ logging.warning(
244
+ "PyAudio not available. Audio playback task will not run.")
245
+ return
246
 
247
  try:
248
+ while audio_from_gemini_playback_q is None and self.is_running:
249
+ await asyncio.sleep(0.1)
250
+ if not self.is_running:
251
+ return
252
 
253
  self.playback_stream = await asyncio.to_thread(
254
  pya.open, format=PYAUDIO_FORMAT, channels=PYAUDIO_CHANNELS, rate=GEMINI_AUDIO_RECEIVE_SAMPLE_RATE, output=True, frames_per_buffer=PYAUDIO_PLAYBACK_CHUNK_SIZE
255
  )
256
+ logging.info(
257
+ f"PyAudio playback stream opened at {GEMINI_AUDIO_RECEIVE_SAMPLE_RATE} Hz.")
258
  while self.is_running:
259
  try:
260
  audio_chunk = await asyncio.wait_for(audio_from_gemini_playback_q.get(), timeout=1.0)
261
+ if audio_chunk is None and not self.is_running:
262
+ break # Sentinel and stop signal
263
+ if audio_chunk:
264
+ await asyncio.to_thread(self.playback_stream.write, audio_chunk)
265
+ if audio_chunk:
266
+ audio_from_gemini_playback_q.task_done()
267
+ except asyncio.TimeoutError:
268
+ continue
269
+ except Exception as e:
270
+ logging.error(f"Error playing audio chunk: {e}", exc_info=True)
271
+ await asyncio.sleep(0.01)
272
  except Exception as e:
273
+ logging.error(
274
+ f"Failed to open or use PyAudio playback stream (might be expected in this environment): {e}", exc_info=True)
275
  finally:
276
  if self.playback_stream:
277
  logging.info("Stopping and closing PyAudio playback stream.")
278
  try:
279
  await asyncio.to_thread(self.playback_stream.stop_stream)
280
  await asyncio.to_thread(self.playback_stream.close)
281
+ except Exception as e_close:
282
+ logging.error(
283
+ f"Error closing playback stream: {e_close}", exc_info=True)
284
  self.playback_stream = None
285
  logging.info("Task finished: play_gemini_audio.")
286
 
 
291
  ("audio_in_q", audio_chunks_to_gemini_q),
292
  ("audio_out_q", audio_from_gemini_playback_q)]:
293
  if q_obj_ref:
294
+ try:
295
+ q_obj_ref.put_nowait(None)
296
+ except asyncio.QueueFull:
297
+ logging.warning(
298
+ f"Queue {q_name} was full when trying to put sentinel for stop signal.")
299
+ except Exception as e:
300
+ logging.error(
301
+ f"Error putting sentinel in {q_name}: {e}", exc_info=True)
302
 
303
  async def run_main_loop(self):
304
  global video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q
 
307
  self.is_running = True
308
  logging.info("GeminiInteractionLoop run_main_loop starting...")
309
 
310
+ video_frames_to_gemini_q = asyncio.Queue(
311
+ maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
312
+ audio_chunks_to_gemini_q = asyncio.Queue(
313
+ maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
314
+ audio_from_gemini_playback_q = asyncio.Queue(
315
+ maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
316
  logging.info("Asyncio queues initialized in GeminiInteractionLoop.")
317
 
318
+ if client is None:
319
+ logging.critical(
320
+ "Gemini client is None in run_main_loop. Aborting.")
321
+ return
322
 
323
  try:
324
  async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
325
  self.gemini_session = session
326
+ logging.info(
327
+ f"Gemini session established with API for model {MODEL_NAME}.")
328
  try:
329
  logging.info("Sending system prompt to Gemini...")
330
  await self.gemini_session.send(input=MEDICAL_ASSISTANT_SYSTEM_PROMPT, end_of_turn=False)
331
  logging.info("System prompt sent successfully.")
332
  except Exception as e:
333
+ logging.error(
334
+ f"Failed to send system prompt: {e}", exc_info=True)
335
+ self.is_running = False
336
+ return
337
 
338
  # Using asyncio.gather for Python 3.9 compatibility
339
  tasks = []
340
  try:
341
  logging.info("Creating async tasks for Gemini interaction...")
342
+ tasks.append(asyncio.create_task(
343
+ self.stream_media_to_gemini(), name="stream_media_to_gemini"))
344
+ tasks.append(asyncio.create_task(
345
+ self.process_gemini_responses(), name="process_gemini_responses"))
346
+ tasks.append(asyncio.create_task(
347
+ self.play_gemini_audio(), name="play_gemini_audio"))
348
  logging.info("All Gemini interaction tasks created.")
349
  # Wait for tasks to complete or raise an exception
350
  done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
351
  # Check results of completed tasks for errors
352
  for future in done:
353
  try:
354
+ future.result() # Raise exception if task failed
355
  except Exception as task_exc:
356
+ logging.error(
357
+ f"Task {future.get_name()} failed: {task_exc}", exc_info=True)
358
  # Optionally cancel remaining tasks if one fails critically
359
+ for p_task in pending:
360
+ p_task.cancel()
361
  # If loop completes normally (e.g., user stops), pending tasks will be handled by finally block
362
+ except Exception as e_gather: # Catch errors during task creation/gathering
363
+ logging.error(
364
+ f"Error during task management: {e_gather}", exc_info=True)
365
  for task in tasks:
366
+ if not task.done():
367
+ task.cancel()
368
  # Wait for cancellations to complete
369
  await asyncio.gather(*tasks, return_exceptions=True)
370
  logging.info("Gemini interaction tasks finished or cancelled.")
371
 
372
+ except asyncio.CancelledError:
373
+ logging.info("GeminiInteractionLoop.run_main_loop() was cancelled.")
374
+ except Exception as e: # General catch-all, including ConnectionClosedError
375
+ logging.error(
376
+ f"Exception in GeminiInteractionLoop run_main_loop: {type(e).__name__}: {e}", exc_info=True)
377
  finally:
378
  logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
379
  self.is_running = False
380
+ self.signal_stop() # Ensure sentinels are sent
381
  # Clean up any remaining tasks (important if gather didn't complete)
382
  # current_tasks = [t for t in asyncio.all_tasks(self.async_event_loop) if t is not asyncio.current_task()]
383
  # if current_tasks:
 
389
  video_frames_to_gemini_q = None
390
  audio_chunks_to_gemini_q = None
391
  audio_from_gemini_playback_q = None
392
+ logging.info(
393
+ "GeminiInteractionLoop finished and global queues set to None.")
394
 
395
 
396
  # --- WebRTC Media Processors ---
 
400
  self.last_gemini_send_time = time.monotonic()
401
 
402
  async def _process_and_queue_frame_async(self, frame_ndarray):
403
+ if video_frames_to_gemini_q is None:
404
+ return
405
  self.frame_counter += 1
406
  current_time = time.monotonic()
407
+ if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI):
408
+ return
409
  self.last_gemini_send_time = current_time
410
  try:
411
  img_rgb = cv2.cvtColor(frame_ndarray, cv2.COLOR_BGR2RGB)
 
414
  image_io = io.BytesIO()
415
  pil_img.save(image_io, format="jpeg")
416
  image_bytes = image_io.getvalue()
417
+ api_data = {"mime_type": "image/jpeg",
418
+ "data": base64.b64encode(image_bytes).decode()}
419
  if video_frames_to_gemini_q.full():
420
+ try:
421
+ await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.01)
422
+ except asyncio.TimeoutError:
423
+ logging.warning("Video queue full, frame dropped.")
424
+ return
425
  video_frames_to_gemini_q.put_nowait(api_data)
426
+ except Exception as e:
427
+ logging.error(
428
+ f"Error processing/queueing video frame: {e}", exc_info=True)
429
 
430
  async def recv(self, frame):
431
  img_bgr = frame.to_ndarray(format="bgr24")
432
  try:
433
  loop = asyncio.get_running_loop()
434
  loop.create_task(self._process_and_queue_frame_async(img_bgr))
435
+ except RuntimeError:
436
+ logging.error(
437
+ "VideoProcessor.recv: No running asyncio loop in current thread for create_task.")
438
  return frame
439
 
440
+
441
  class AudioProcessor(AudioProcessorBase):
442
  async def _process_and_queue_audio_async(self, audio_frames):
443
+ if audio_chunks_to_gemini_q is None:
444
+ return
445
  for frame in audio_frames:
446
  audio_data = frame.planes[0].to_bytes()
447
  mime_type = f"audio/L16;rate={frame.sample_rate};channels={frame.layout.channels}"
448
  api_data = {"data": audio_data, "mime_type": mime_type}
449
  try:
450
  if audio_chunks_to_gemini_q.full():
451
+ try:
452
+ await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.01)
453
+ except asyncio.TimeoutError:
454
+ logging.warning("Audio queue full, chunk dropped.")
455
+ continue
456
  audio_chunks_to_gemini_q.put_nowait(api_data)
457
+ except Exception as e:
458
+ logging.error(
459
+ f"Error queueing audio chunk: {e}", exc_info=True)
460
+
461
  async def recv(self, frames):
462
  try:
463
  loop = asyncio.get_running_loop()
464
  loop.create_task(self._process_and_queue_audio_async(frames))
465
+ except RuntimeError:
466
+ logging.error(
467
+ "AudioProcessor.recv: No running asyncio loop in current thread for create_task.")
468
  return frames
469
 
470
  # --- Streamlit UI and Application Logic ---