noumanjavaid commited on
Commit
99636da
·
verified ·
1 Parent(s): a46c042

Update src/streamlit_app.py

Browse files
Files changed (1) hide show
  1. src/streamlit_app.py +76 -27
src/streamlit_app.py CHANGED
@@ -6,7 +6,7 @@ import base64
6
  import io
7
  import threading
8
  import traceback
9
- import atexit # Correctly imported
10
  import time
11
  import logging
12
  from dotenv import load_dotenv
@@ -24,21 +24,20 @@ from streamlit_webrtc import (
24
  WebRtcMode,
25
  AudioProcessorBase,
26
  VideoProcessorBase,
27
- # ClientSettings # Removed as it's not used in this version
28
  )
29
- # from aiortc import RTCIceServer, RTCConfiguration # RTCConfiguration object not needed directly for webrtc_streamer
30
 
31
  # --- Configuration ---
32
  load_dotenv()
33
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
34
 
35
  # Audio configuration
36
- PYAUDIO_FORMAT = pyaudio.paInt16 # For PyAudio playback
37
- PYAUDIO_CHANNELS = 1 # For PyAudio playback
38
- WEBRTC_REQUESTED_AUDIO_CHANNELS = 1 # Request mono audio from WebRTC
39
- WEBRTC_REQUESTED_SEND_SAMPLE_RATE = 16000 # Target sample rate for audio sent to Gemini
40
- GEMINI_AUDIO_RECEIVE_SAMPLE_RATE = 24000 # Gemini documentation recommendation for its TTS
41
- PYAUDIO_PLAYBACK_CHUNK_SIZE = 1024 # For PyAudio playback
42
  AUDIO_PLAYBACK_QUEUE_MAXSIZE = 50
43
  MEDIA_TO_GEMINI_QUEUE_MAXSIZE = 30
44
 
@@ -72,10 +71,10 @@ def cleanup_pyaudio():
72
  pya.terminate()
73
  atexit.register(cleanup_pyaudio)
74
 
75
- # --- Global Queues for WebRTC to Backend Communication ---
76
- video_frames_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
77
- audio_chunks_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
78
- audio_from_gemini_playback_q = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
79
 
80
  # --- Gemini Client Setup ---
81
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
@@ -106,6 +105,8 @@ class GeminiInteractionLoop:
106
  self.async_event_loop = None
107
  self.is_running = True
108
  self.playback_stream = None
 
 
109
 
110
  async def send_text_input_to_gemini(self, user_text):
111
  if not user_text or not self.gemini_session or not self.is_running:
@@ -120,6 +121,10 @@ class GeminiInteractionLoop:
120
  async def stream_media_to_gemini(self):
121
  logging.info("Task started: Stream media from WebRTC queues to Gemini.")
122
  async def get_media_from_queues():
 
 
 
 
123
  try:
124
  video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
125
  video_frames_to_gemini_q.task_done()
@@ -154,6 +159,8 @@ class GeminiInteractionLoop:
154
  while self.is_running:
155
  if not self.gemini_session:
156
  await asyncio.sleep(0.1); continue
 
 
157
  try:
158
  turn_response = self.gemini_session.receive()
159
  async for chunk in turn_response:
@@ -176,6 +183,11 @@ class GeminiInteractionLoop:
176
  async def play_gemini_audio(self):
177
  logging.info("Task started: Play Gemini audio responses.")
178
  try:
 
 
 
 
 
179
  self.playback_stream = await asyncio.to_thread(
180
  pya.open, format=PYAUDIO_FORMAT, channels=PYAUDIO_CHANNELS, rate=GEMINI_AUDIO_RECEIVE_SAMPLE_RATE, output=True, frames_per_buffer=PYAUDIO_PLAYBACK_CHUNK_SIZE
181
  )
@@ -202,15 +214,26 @@ class GeminiInteractionLoop:
202
  def signal_stop(self):
203
  logging.info("Signal to stop GeminiInteractionLoop received.")
204
  self.is_running = False
 
205
  for q in [video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q]:
206
- try: q.put_nowait(None)
207
- except asyncio.QueueFull: logging.warning(f"Queue was full when trying to put sentinel for stop signal.")
208
- except Exception as e: logging.error(f"Error putting sentinel in queue: {e}", exc_info=True)
 
209
 
210
  async def run_main_loop(self):
 
 
211
  self.async_event_loop = asyncio.get_running_loop()
212
  self.is_running = True
213
  logging.info("GeminiInteractionLoop run_main_loop starting...")
 
 
 
 
 
 
 
214
  if client is None:
215
  logging.critical("Gemini client is None in run_main_loop. Aborting.")
216
  return
@@ -245,7 +268,11 @@ class GeminiInteractionLoop:
245
  logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
246
  self.is_running = False
247
  self.gemini_session = None
248
- logging.info("GeminiInteractionLoop finished.")
 
 
 
 
249
 
250
  # --- WebRTC Media Processors ---
251
  class VideoProcessor(VideoProcessorBase):
@@ -254,6 +281,10 @@ class VideoProcessor(VideoProcessorBase):
254
  self.last_gemini_send_time = time.monotonic()
255
 
256
  async def _process_and_queue_frame_async(self, frame_ndarray):
 
 
 
 
257
  self.frame_counter += 1
258
  current_time = time.monotonic()
259
  if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI):
@@ -277,11 +308,22 @@ class VideoProcessor(VideoProcessorBase):
277
 
278
  async def recv(self, frame):
279
  img_bgr = frame.to_ndarray(format="bgr24")
280
- asyncio.create_task(self._process_and_queue_frame_async(img_bgr))
 
 
 
 
 
 
 
281
  return frame
282
 
283
  class AudioProcessor(AudioProcessorBase):
284
  async def _process_and_queue_audio_async(self, audio_frames):
 
 
 
 
285
  for frame in audio_frames:
286
  audio_data = frame.planes[0].to_bytes()
287
  mime_type = f"audio/L16;rate={frame.sample_rate};channels={frame.layout.channels}"
@@ -295,7 +337,12 @@ class AudioProcessor(AudioProcessorBase):
295
  except Exception as e: logging.error(f"Error queueing audio chunk: {e}", exc_info=True)
296
 
297
  async def recv(self, frames):
298
- asyncio.create_task(self._process_and_queue_audio_async(frames))
 
 
 
 
 
299
  return frames
300
 
301
  # --- Streamlit UI and Application Logic ---
@@ -324,10 +371,7 @@ def run_streamlit_app():
324
  st.session_state.gemini_session_active = True
325
  st.session_state.chat_messages = [{"role": "system", "content": "Assistant activating. Please allow camera/microphone access in your browser if prompted."}]
326
 
327
- for q in [video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q]:
328
- while not q.empty():
329
- try: q.get_nowait()
330
- except asyncio.QueueEmpty: break
331
 
332
  gemini_loop = GeminiInteractionLoop()
333
  st.session_state.gemini_loop_instance = gemini_loop
@@ -358,15 +402,20 @@ def run_streamlit_app():
358
  }
359
  }
360
 
 
 
 
 
 
361
  webrtc_ctx = webrtc_streamer(
362
  key=st.session_state.webrtc_component_key,
363
  mode=WebRtcMode.SENDONLY,
364
- rtc_configuration={ # MODIFIED HERE: Pass dictionary directly
365
  "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
366
  },
367
  media_stream_constraints=MEDIA_STREAM_CONSTRAINTS,
368
- video_processor_factory=VideoProcessor,
369
- audio_processor_factory=AudioProcessor,
370
  async_processing=True,
371
  )
372
 
@@ -374,7 +423,7 @@ def run_streamlit_app():
374
  st.caption("WebRTC connected. Streaming your camera and microphone.")
375
  elif st.session_state.gemini_session_active:
376
  st.caption("WebRTC attempting to connect. Ensure camera/microphone permissions are granted in your browser.")
377
- if hasattr(webrtc_ctx.state, 'error') and webrtc_ctx.state.error: # Check if error attribute exists
378
  st.error(f"WebRTC Connection Error: {webrtc_ctx.state.error}")
379
  else:
380
  st.info("Click 'Start Session' in the sidebar to enable the live feed and assistant.")
 
6
  import io
7
  import threading
8
  import traceback
9
+ import atexit
10
  import time
11
  import logging
12
  from dotenv import load_dotenv
 
24
  WebRtcMode,
25
  AudioProcessorBase,
26
  VideoProcessorBase,
 
27
  )
28
+ # from aiortc import RTCIceServer, RTCConfiguration # RTCConfiguration object not needed directly
29
 
30
  # --- Configuration ---
31
  load_dotenv()
32
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
33
 
34
  # Audio configuration
35
+ PYAUDIO_FORMAT = pyaudio.paInt16
36
+ PYAUDIO_CHANNELS = 1
37
+ WEBRTC_REQUESTED_AUDIO_CHANNELS = 1
38
+ WEBRTC_REQUESTED_SEND_SAMPLE_RATE = 16000
39
+ GEMINI_AUDIO_RECEIVE_SAMPLE_RATE = 24000
40
+ PYAUDIO_PLAYBACK_CHUNK_SIZE = 1024
41
  AUDIO_PLAYBACK_QUEUE_MAXSIZE = 50
42
  MEDIA_TO_GEMINI_QUEUE_MAXSIZE = 30
43
 
 
71
  pya.terminate()
72
  atexit.register(cleanup_pyaudio)
73
 
74
+ # --- Global Queues - Declare as None, initialize later ---
75
+ video_frames_to_gemini_q: asyncio.Queue = None
76
+ audio_chunks_to_gemini_q: asyncio.Queue = None
77
+ audio_from_gemini_playback_q: asyncio.Queue = None
78
 
79
  # --- Gemini Client Setup ---
80
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
 
105
  self.async_event_loop = None
106
  self.is_running = True
107
  self.playback_stream = None
108
+ # Queues will be initialized in run_main_loop and assigned to global vars
109
+ # This class will use the global queue variables directly
110
 
111
  async def send_text_input_to_gemini(self, user_text):
112
  if not user_text or not self.gemini_session or not self.is_running:
 
121
  async def stream_media_to_gemini(self):
122
  logging.info("Task started: Stream media from WebRTC queues to Gemini.")
123
  async def get_media_from_queues():
124
+ # Ensure queues are initialized before trying to get from them
125
+ if video_frames_to_gemini_q is None or audio_chunks_to_gemini_q is None:
126
+ await asyncio.sleep(0.1) # Wait for queues to be initialized
127
+ return None
128
  try:
129
  video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
130
  video_frames_to_gemini_q.task_done()
 
159
  while self.is_running:
160
  if not self.gemini_session:
161
  await asyncio.sleep(0.1); continue
162
+ if audio_from_gemini_playback_q is None: # Wait for queue init
163
+ await asyncio.sleep(0.1); continue
164
  try:
165
  turn_response = self.gemini_session.receive()
166
  async for chunk in turn_response:
 
183
  async def play_gemini_audio(self):
184
  logging.info("Task started: Play Gemini audio responses.")
185
  try:
186
+ # Wait for the playback queue to be initialized
187
+ while audio_from_gemini_playback_q is None and self.is_running:
188
+ await asyncio.sleep(0.1)
189
+ if not self.is_running: return
190
+
191
  self.playback_stream = await asyncio.to_thread(
192
  pya.open, format=PYAUDIO_FORMAT, channels=PYAUDIO_CHANNELS, rate=GEMINI_AUDIO_RECEIVE_SAMPLE_RATE, output=True, frames_per_buffer=PYAUDIO_PLAYBACK_CHUNK_SIZE
193
  )
 
214
  def signal_stop(self):
215
  logging.info("Signal to stop GeminiInteractionLoop received.")
216
  self.is_running = False
217
+ # Use global queue variables directly
218
  for q in [video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q]:
219
+ if q: # Check if queue is initialized
220
+ try: q.put_nowait(None)
221
+ except asyncio.QueueFull: logging.warning(f"Queue was full when trying to put sentinel for stop signal.")
222
+ except Exception as e: logging.error(f"Error putting sentinel in queue: {e}", exc_info=True)
223
 
224
  async def run_main_loop(self):
225
+ global video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q # Allow modification of global vars
226
+
227
  self.async_event_loop = asyncio.get_running_loop()
228
  self.is_running = True
229
  logging.info("GeminiInteractionLoop run_main_loop starting...")
230
+
231
+ # Initialize queues here, within the asyncio loop of this thread
232
+ video_frames_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
233
+ audio_chunks_to_gemini_q = asyncio.Queue(maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
234
+ audio_from_gemini_playback_q = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
235
+ logging.info("Asyncio queues initialized in GeminiInteractionLoop.")
236
+
237
  if client is None:
238
  logging.critical("Gemini client is None in run_main_loop. Aborting.")
239
  return
 
268
  logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
269
  self.is_running = False
270
  self.gemini_session = None
271
+ # Clear queues on exit to prevent issues if loop restarts
272
+ video_frames_to_gemini_q = None
273
+ audio_chunks_to_gemini_q = None
274
+ audio_from_gemini_playback_q = None
275
+ logging.info("GeminiInteractionLoop finished and queues cleared.")
276
 
277
  # --- WebRTC Media Processors ---
278
  class VideoProcessor(VideoProcessorBase):
 
281
  self.last_gemini_send_time = time.monotonic()
282
 
283
  async def _process_and_queue_frame_async(self, frame_ndarray):
284
+ if video_frames_to_gemini_q is None: # Wait for queue to be initialized
285
+ logging.debug("VideoProcessor: video_frames_to_gemini_q is None, waiting...")
286
+ return
287
+
288
  self.frame_counter += 1
289
  current_time = time.monotonic()
290
  if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI):
 
308
 
309
  async def recv(self, frame):
310
  img_bgr = frame.to_ndarray(format="bgr24")
311
+ # Ensure an event loop is running in the current thread for create_task
312
+ try:
313
+ loop = asyncio.get_running_loop()
314
+ loop.create_task(self._process_and_queue_frame_async(img_bgr))
315
+ except RuntimeError: # No running loop in this thread (should not happen with streamlit-webrtc async_processing=True)
316
+ logging.error("VideoProcessor.recv: No running asyncio loop in current thread for create_task.")
317
+ # Fallback or log error, direct call might block WebRTC thread
318
+ # await self._process_and_queue_frame_async(img_bgr) # Potentially blocking
319
  return frame
320
 
321
  class AudioProcessor(AudioProcessorBase):
322
  async def _process_and_queue_audio_async(self, audio_frames):
323
+ if audio_chunks_to_gemini_q is None: # Wait for queue to be initialized
324
+ logging.debug("AudioProcessor: audio_chunks_to_gemini_q is None, waiting...")
325
+ return
326
+
327
  for frame in audio_frames:
328
  audio_data = frame.planes[0].to_bytes()
329
  mime_type = f"audio/L16;rate={frame.sample_rate};channels={frame.layout.channels}"
 
337
  except Exception as e: logging.error(f"Error queueing audio chunk: {e}", exc_info=True)
338
 
339
  async def recv(self, frames):
340
+ try:
341
+ loop = asyncio.get_running_loop()
342
+ loop.create_task(self._process_and_queue_audio_async(frames))
343
+ except RuntimeError:
344
+ logging.error("AudioProcessor.recv: No running asyncio loop in current thread for create_task.")
345
+ # await self._process_and_queue_audio_async(frames) # Potentially blocking
346
  return frames
347
 
348
  # --- Streamlit UI and Application Logic ---
 
371
  st.session_state.gemini_session_active = True
372
  st.session_state.chat_messages = [{"role": "system", "content": "Assistant activating. Please allow camera/microphone access in your browser if prompted."}]
373
 
374
+ # Queues will be initialized inside GeminiInteractionLoop's thread
 
 
 
375
 
376
  gemini_loop = GeminiInteractionLoop()
377
  st.session_state.gemini_loop_instance = gemini_loop
 
402
  }
403
  }
404
 
405
+ # Only render WebRTC streamer if queues are expected to be initialized by the Gemini loop
406
+ # This is a bit of a race condition check, might need refinement
407
+ # A better way would be for Gemini loop to signal when queues are ready.
408
+ # For now, we assume if session is active, loop is trying to start and init queues.
409
+
410
  webrtc_ctx = webrtc_streamer(
411
  key=st.session_state.webrtc_component_key,
412
  mode=WebRtcMode.SENDONLY,
413
+ rtc_configuration={
414
  "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
415
  },
416
  media_stream_constraints=MEDIA_STREAM_CONSTRAINTS,
417
+ video_processor_factory=VideoProcessor, # Pass the class
418
+ audio_processor_factory=AudioProcessor, # Pass the class
419
  async_processing=True,
420
  )
421
 
 
423
  st.caption("WebRTC connected. Streaming your camera and microphone.")
424
  elif st.session_state.gemini_session_active:
425
  st.caption("WebRTC attempting to connect. Ensure camera/microphone permissions are granted in your browser.")
426
+ if hasattr(webrtc_ctx.state, 'error') and webrtc_ctx.state.error:
427
  st.error(f"WebRTC Connection Error: {webrtc_ctx.state.error}")
428
  else:
429
  st.info("Click 'Start Session' in the sidebar to enable the live feed and assistant.")