noumanjavaid commited on
Commit
76a264c
Β·
verified Β·
1 Parent(s): 7e3304a

Update src/streamlit_app.py

Browse files
Files changed (1) hide show
  1. src/streamlit_app.py +349 -363
src/streamlit_app.py CHANGED
@@ -5,42 +5,44 @@ import asyncio
5
  import base64
6
  import io
7
  import threading
8
- import queue # Standard library queue, not asyncio.Queue for thread-safe UI updates if needed
9
  import traceback
10
- import time # Keep time for potential future use (e.g., timestamps)
 
 
11
  from dotenv import load_dotenv
12
 
13
  # --- Import main libraries ---
14
  import cv2
15
  import pyaudio
16
  import PIL.Image
17
- import mss
18
 
19
  from google import genai
20
  from google.genai import types
21
 
22
  # --- Configuration ---
23
- load_dotenv()
24
 
25
  # Audio configuration
26
  FORMAT = pyaudio.paInt16
27
  CHANNELS = 1
28
  SEND_SAMPLE_RATE = 16000
29
- RECEIVE_SAMPLE_RATE = 24000 # According to Gemini documentation
30
  CHUNK_SIZE = 1024
31
- AUDIO_QUEUE_MAXSIZE = 20 # Max audio chunks to buffer for playback
 
32
 
33
  # Video configuration
34
  VIDEO_FPS_LIMIT = 1 # Send 1 frame per second to the API
35
- VIDEO_PREVIEW_RESIZE = (640, 480) # Size for Streamlit preview
36
- VIDEO_API_RESIZE = (1024, 1024) # Max size to send to API (adjust if needed)
37
 
38
  # Gemini model configuration
39
- MODEL = "models/gemini-2.0-flash-live-001" # Ensure this is the correct model for live capabilities
40
- DEFAULT_MODE = "camera" # Default video input mode
41
 
42
  # System Prompt for the Medical Assistant
43
- MEDICAL_ASSISTANT_SYSTEM_PROMPT = """You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen.
44
 
45
  Your responsibilities are:
46
  1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe.
@@ -56,76 +58,95 @@ Your responsibilities are:
56
  Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
57
  """
58
 
59
- # Initialize Streamlit state
 
 
 
 
 
 
 
 
 
 
60
  def init_session_state():
61
- if 'initialized' not in st.session_state:
62
- st.session_state['initialized'] = False
63
- if 'audio_loop' not in st.session_state:
64
- st.session_state['audio_loop'] = None
65
- if 'chat_messages' not in st.session_state:
66
- st.session_state['chat_messages'] = []
67
- if 'current_frame' not in st.session_state:
68
- st.session_state['current_frame'] = None
69
- if 'run_loop' not in st.session_state: # Flag to control the loop from Streamlit
70
- st.session_state['run_loop'] = False
71
-
72
- # Initialize all session state variables
73
  init_session_state()
74
 
75
- # Configure page
76
- st.set_page_config(page_title="Real-time Medical Assistant", layout="wide")
 
77
 
78
  # Initialize Gemini client
79
- # Ensure API key is set in environment variables or .env file
80
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
81
  if not GEMINI_API_KEY:
82
  st.error("GEMINI_API_KEY not found. Please set it in your environment variables or a .env file.")
83
  st.stop()
84
 
85
- client = genai.Client(
86
- http_options={"api_version": "v1beta"},
87
- api_key=GEMINI_API_KEY,
88
- )
 
 
 
 
 
 
89
 
90
- # Configure Gemini client and response settings
91
- CONFIG = types.LiveConnectConfig(
92
- response_modalities=["audio", "text"], # Ensure text is also enabled if you want to display AI text directly
93
  speech_config=types.SpeechConfig(
94
  voice_config=types.VoiceConfig(
95
- prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Puck") # Or other preferred voice
96
  )
97
  ),
98
- # If the API supports an initial_prompt field in LiveConnectConfig, it would be ideal here.
99
- # As of some versions, it might not be directly available, hence sending as first message.
100
  )
101
 
102
- pya = pyaudio.PyAudio()
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
- class AudioLoop:
105
- def __init__(self, video_mode=DEFAULT_MODE):
106
- self.video_mode = video_mode
107
- self.audio_in_queue = None # asyncio.Queue for audio playback
108
- self.out_queue = None # asyncio.Queue for data to Gemini
109
- self.session = None
110
- # Tasks are managed by TaskGroup now
111
- self.running = True # General flag to control async loops
112
- self.audio_stream = None # PyAudio input stream
113
-
114
- async def send_text_to_gemini(self, text_input): # Renamed from send_text to avoid confusion
115
- if not text_input or not self.session or not self.running:
116
- st.warning("Session not active or no text to send.")
117
  return
118
  try:
119
- # User messages should typically end the turn for the AI to respond.
120
- await self.session.send(input=text_input, end_of_turn=True)
121
- # UI update for user message is handled in main Streamlit part
122
  except Exception as e:
123
- st.error(f"Error sending message to Gemini: {str(e)}")
124
- traceback.print_exception(e)
125
 
126
- def _get_frame(self, cap):
127
- ret, frame = cap.read()
 
 
 
128
  if not ret:
 
129
  return None
130
 
131
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
@@ -135,404 +156,369 @@ class AudioLoop:
135
  preview_img.thumbnail(VIDEO_PREVIEW_RESIZE)
136
 
137
  api_img = img.copy()
138
- api_img.thumbnail(VIDEO_API_RESIZE)
139
 
140
  image_io = io.BytesIO()
141
  api_img.save(image_io, format="jpeg")
142
- image_io.seek(0)
143
- image_bytes = image_io.read()
144
 
145
  return {
146
  "preview": preview_img,
147
- "api": {
148
- "mime_type": "image/jpeg",
149
- "data": base64.b64encode(image_bytes).decode()
150
- }
151
  }
152
 
153
- async def get_frames_from_camera(self): # Renamed for clarity
154
- cap = None
155
  try:
156
- cap = await asyncio.to_thread(cv2.VideoCapture, 0)
157
- if not cap.isOpened():
158
- st.error("Could not open camera.") # This error needs to reach Streamlit UI
159
- self.running = False # Stop the loop if camera fails
160
  return
161
 
162
- while self.running:
163
- frame_data = await asyncio.to_thread(self._get_frame, cap)
164
- if frame_data is None:
165
- await asyncio.sleep(0.01) # Short sleep if frame read fails
166
- continue
167
-
168
- st.session_state['current_frame'] = frame_data["preview"]
169
-
170
- if self.out_queue.full():
171
- await self.out_queue.get() # Make space if full to avoid indefinite block
172
-
173
- await self.out_queue.put(frame_data["api"])
174
  await asyncio.sleep(1.0 / VIDEO_FPS_LIMIT)
175
  except Exception as e:
176
- st.error(f"Camera streaming error: {e}")
177
- self.running = False
 
178
  finally:
179
- if cap:
180
- await asyncio.to_thread(cap.release)
181
-
182
- def _get_screen_frame(self): # Renamed for clarity
183
- sct = mss.mss()
184
- # Use the first monitor
185
- monitor_number = 1
186
- if len(sct.monitors) > 1: # sct.monitors[0] is all monitors, sct.monitors[1] is primary
187
- monitor = sct.monitors[monitor_number]
188
- else: # If only one monitor entry (all), just use it.
189
- monitor = sct.monitors[0]
190
-
191
-
192
- screenshot = sct.grab(monitor)
193
- img = PIL.Image.frombytes("RGB", screenshot.size, screenshot.rgb)
194
-
195
- preview_img = img.copy()
196
- preview_img.thumbnail(VIDEO_PREVIEW_RESIZE)
197
-
198
- api_img = img.copy()
199
- api_img.thumbnail(VIDEO_API_RESIZE)
200
-
201
- image_io = io.BytesIO()
202
- api_img.save(image_io, format="jpeg")
203
- image_io.seek(0)
204
- image_bytes = image_io.read()
205
-
206
- return {
207
- "preview": preview_img,
208
- "api": {
209
- "mime_type": "image/jpeg",
210
- "data": base64.b64encode(image_bytes).decode()
211
  }
212
- }
213
 
214
- async def get_frames_from_screen(self): # Renamed for clarity
215
  try:
216
- while self.running:
217
- frame_data = await asyncio.to_thread(self._get_screen_frame)
218
- if frame_data is None:
219
- await asyncio.sleep(0.01)
220
- continue
221
-
222
- st.session_state['current_frame'] = frame_data["preview"]
223
-
224
- if self.out_queue.full():
225
- await self.out_queue.get()
226
-
227
- await self.out_queue.put(frame_data["api"])
228
  await asyncio.sleep(1.0 / VIDEO_FPS_LIMIT)
229
  except Exception as e:
230
- st.error(f"Screen capture error: {e}")
231
- self.running = False
232
-
 
 
233
 
234
- async def send_realtime_media(self): # Renamed
235
  try:
236
- while self.running:
237
- if not self.session:
238
- await asyncio.sleep(0.1) # Wait for session to be established
239
  continue
240
  try:
241
- msg = await asyncio.wait_for(self.out_queue.get(), timeout=0.5) # Timeout to prevent blocking indefinitely
242
- if self.session and self.running: # Re-check session and running status
243
- await self.session.send(input=msg) # No end_of_turn for continuous media
244
- self.out_queue.task_done()
 
245
  except asyncio.TimeoutError:
246
- continue # No new media to send
247
  except Exception as e:
248
- if self.running: # Only log if we are supposed to be running
249
- print(f"Error in send_realtime_media: {e}") # Log to console
250
- # Consider if this error should stop the loop or be reported to UI
251
- await asyncio.sleep(0.1) # Prevent tight loop on error
252
  except asyncio.CancelledError:
253
- print("send_realtime_media task cancelled.")
254
  finally:
255
- print("send_realtime_media task finished.")
256
 
257
-
258
- async def listen_for_audio(self): # Renamed
259
- self.audio_stream = None
260
  try:
261
  mic_info = await asyncio.to_thread(pya.get_default_input_device_info)
262
- self.audio_stream = await asyncio.to_thread(
263
  pya.open,
264
- format=FORMAT,
265
- channels=CHANNELS,
266
- rate=SEND_SAMPLE_RATE,
267
- input=True,
268
- input_device_index=mic_info["index"],
269
  frames_per_buffer=CHUNK_SIZE,
270
  )
271
  print("Microphone stream opened.")
272
- while self.running:
273
  try:
274
- # exception_on_overflow=False helps avoid crashes on buffer overflows
275
- data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, exception_on_overflow=False)
276
- if self.out_queue.full():
277
- await self.out_queue.get() # Make space
278
- await self.out_queue.put({"data": data, "mime_type": "audio/pcm"})
279
- except IOError as e: # PyAudio specific IO errors
280
- if e.errno == pyaudio.paInputOverflowed:
281
- print("PyAudio Input overflowed. Skipping frame.") # Or log to a file/UI
282
  else:
283
- print(f"PyAudio read error: {e}")
284
- self.running = False # Potentially stop on other IOErrors
285
- break
286
  except Exception as e:
287
- print(f"Error in listen_for_audio: {e}")
288
- await asyncio.sleep(0.01) # Prevent tight loop on error
289
  except Exception as e:
290
- st.error(f"Failed to open microphone: {e}") # This error needs to reach Streamlit UI
291
- self.running = False
 
292
  finally:
293
- if self.audio_stream:
294
- await asyncio.to_thread(self.audio_stream.stop_stream)
295
- await asyncio.to_thread(self.audio_stream.close)
296
- print("Microphone stream closed.")
 
297
 
298
-
299
- async def receive_gemini_responses(self): # Renamed
300
  try:
301
- while self.running:
302
- if not self.session:
303
- await asyncio.sleep(0.1) # Wait for session
304
- continue
305
  try:
306
- # Blocking receive, but should yield if self.running becomes false or session closes
307
- turn = self.session.receive()
308
- async for response in turn:
309
- if not self.running: break # Exit if stop signal received during iteration
310
- if data := response.data: # Audio data
311
- if not self.audio_in_queue.full():
312
- self.audio_in_queue.put_nowait(data)
313
  else:
314
- print("Playback audio queue full, discarding data.")
315
- if text := response.text: # Text part of the response
316
- # Queue this for the main thread to update Streamlit
317
- st.session_state['chat_messages'].append({"role": "assistant", "content": text})
318
- # Consider st.experimental_rerun() if immediate update is critical and safe
319
- # For now, rely on Streamlit's natural refresh from chat_input or other interactions
320
-
321
- # Handle turn completion logic if needed (e.g., clear audio queue for interruptions)
322
- # For simplicity, current model might not need complex interruption handling here.
323
- # If interruptions are implemented (e.g., user speaks while AI is speaking),
324
- # you might want to clear self.audio_in_queue here.
325
  except types.generation_types.StopCandidateException:
326
- print("Gemini indicated end of response (StopCandidateException).") # Normal
327
  except Exception as e:
328
- if self.running:
329
  print(f"Error receiving from Gemini: {e}")
330
- await asyncio.sleep(0.1) # Prevent tight loop on error
331
  except asyncio.CancelledError:
332
- print("receive_gemini_responses task cancelled.")
333
  finally:
334
- print("receive_gemini_responses task finished.")
335
-
336
 
337
- async def play_audio_responses(self): # Renamed
338
- playback_stream = None
339
  try:
340
- playback_stream = await asyncio.to_thread(
341
- pya.open,
342
- format=FORMAT, # Assuming Gemini audio matches this, or adjust
343
- channels=CHANNELS,
344
- rate=RECEIVE_SAMPLE_RATE,
345
- output=True,
346
  )
347
  print("Audio playback stream opened.")
348
- while self.running:
349
  try:
350
- bytestream = await asyncio.wait_for(self.audio_in_queue.get(), timeout=0.5)
351
- await asyncio.to_thread(playback_stream.write, bytestream)
352
- self.audio_in_queue.task_done()
 
 
353
  except asyncio.TimeoutError:
354
- continue # No audio to play
355
  except Exception as e:
356
  print(f"Error playing audio: {e}")
357
- await asyncio.sleep(0.01) # Prevent tight loop
358
  except Exception as e:
359
- st.error(f"Failed to open audio playback: {e}")
360
- self.running = False
 
361
  finally:
362
- if playback_stream:
363
- await asyncio.to_thread(playback_stream.stop_stream)
364
- await asyncio.to_thread(playback_stream.close)
365
- print("Audio playback stream closed.")
366
-
367
- def stop_loop(self): # Renamed
368
- print("Stop signal received for AudioLoop.")
369
- self.running = False
370
- # Queues can be an issue for graceful shutdown if tasks are blocked on put/get
371
- # Put sentinel values or use timeouts in queue operations
372
- if self.out_queue: # For send_realtime_media
373
- self.out_queue.put_nowait(None) # Sentinel to unblock .get()
374
- if self.audio_in_queue: # For play_audio_responses
375
- self.audio_in_queue.put_nowait(None) # Sentinel
376
-
377
- async def run(self):
378
- st.session_state['run_loop'] = True # Indicate loop is running
379
- self.running = True
380
- print("AudioLoop starting...")
 
 
 
 
 
 
381
  try:
382
- # `client.aio.live.connect` is an async context manager
383
- async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
384
- self.session = session
385
  print("Gemini session established.")
386
 
387
- # Send the system prompt first.
388
  try:
389
  print("Sending system prompt to Gemini...")
390
- # end_of_turn=False means this text is part of the initial context for the first actual user interaction.
391
- await self.session.send(input=MEDICAL_ASSISTANT_SYSTEM_PROMPT, end_of_turn=False)
392
- print("System prompt sent.")
393
  except Exception as e:
394
- st.error(f"Failed to send system prompt to Gemini: {str(e)}")
395
- traceback.print_exception(e)
396
- self.running = False # Stop if system prompt fails critical setup
397
- return # Exit run method
398
 
399
- # Initialize queues within the async context if they depend on loop specifics
400
- self.audio_in_queue = asyncio.Queue(maxsize=AUDIO_QUEUE_MAXSIZE)
401
- self.out_queue = asyncio.Queue(maxsize=10) # For outgoing media to Gemini API
402
 
403
  async with asyncio.TaskGroup() as tg:
404
- # Start all background tasks
405
- print("Starting child tasks...")
406
- tg.create_task(self.send_realtime_media(), name="send_realtime_media")
407
- tg.create_task(self.listen_for_audio(), name="listen_for_audio")
408
 
409
  if self.video_mode == "camera":
410
- tg.create_task(self.get_frames_from_camera(), name="get_frames_from_camera")
411
  elif self.video_mode == "screen":
412
- tg.create_task(self.get_frames_from_screen(), name="get_frames_from_screen")
413
- # If mode is "none", no video task is started.
414
 
415
- tg.create_task(self.receive_gemini_responses(), name="receive_gemini_responses")
416
- tg.create_task(self.play_audio_responses(), name="play_audio_responses")
417
- print("All child tasks created.")
418
 
419
- # TaskGroup will wait for all tasks to complete here.
420
- # If self.running is set to False, tasks should ideally notice and exit.
421
- print("TaskGroup finished.")
422
 
423
  except asyncio.CancelledError:
424
- print("AudioLoop.run() was cancelled.") # Usually from TaskGroup cancellation
425
- except ExceptionGroup as eg: # From TaskGroup if child tasks fail
426
- st.error(f"Error in async tasks: {eg.exceptions[0]}") # Show first error in UI
427
- print(f"ExceptionGroup caught in AudioLoop.run(): {eg}")
428
  for i, exc in enumerate(eg.exceptions):
429
- print(f" Exception {i+1}/{len(eg.exceptions)} in TaskGroup: {type(exc).__name__}: {exc}")
430
  traceback.print_exception(type(exc), exc, exc.__traceback__)
431
- except Exception as e:
432
- st.error(f"Critical error in session: {str(e)}")
433
- print(f"Exception caught in AudioLoop.run(): {type(e).__name__}: {e}")
434
- traceback.print_exception(e)
435
  finally:
436
- print("AudioLoop.run() finishing, cleaning up...")
437
- self.running = False # Ensure all loops stop
438
- st.session_state['run_loop'] = False # Signal that the loop has stopped
439
- # `self.session` will be closed automatically by the `async with` block for `client.aio.live.connect`
440
- self.session = None
441
- # Other stream closures are handled in their respective task's finally blocks
442
- print("AudioLoop finished.")
443
 
444
 
445
- def main():
446
- st.title("Gemini Live Medical Assistant")
 
447
 
448
  with st.sidebar:
449
- st.subheader("Settings")
450
  video_mode_options = ["camera", "screen", "none"]
451
- # Ensure default video mode is in options, find its index
452
- default_video_index = video_mode_options.index(DEFAULT_MODE) if DEFAULT_MODE in video_mode_options else 0
453
- video_mode = st.selectbox("Video Source", video_mode_options, index=default_video_index)
454
-
455
- if not st.session_state.get('run_loop', False): # If loop is not running
456
- if st.button("Start Session", key="start_session_button"):
457
- st.session_state.chat_messages = [{ # Clear chat and add system message
 
 
 
 
 
 
 
458
  "role": "system",
459
  "content": (
460
- "Medical Assistant activated. The AI has been instructed on its role to visually assist you. "
461
- "Please remember, this AI cannot provide medical diagnoses or replace consultation with a healthcare professional."
462
  )
463
  }]
464
- st.session_state.current_frame = None # Clear previous frame
465
 
466
- audio_loop = AudioLoop(video_mode=video_mode)
467
- st.session_state.audio_loop = audio_loop
468
 
469
- # Run the asyncio event loop in a new thread
470
- # daemon=True allows Streamlit to exit even if this thread is stuck (though it shouldn't be)
471
- threading.Thread(target=lambda: asyncio.run(audio_loop.run()), daemon=True).start()
472
- st.success("Session started. Initializing assistant...")
473
- st.rerun() # Rerun to update button state and messages
474
- else: # If loop is running
475
- if st.button("Stop Session", key="stop_session_button"):
476
- if st.session_state.audio_loop:
477
- st.session_state.audio_loop.stop_loop() # Signal async tasks to stop
478
- # Wait a moment for tasks to attempt cleanup (optional, can be tricky)
479
- # time.sleep(1)
480
- st.session_state.audio_loop = None
481
- st.warning("Session stopping...")
482
- st.rerun() # Rerun to update UI
483
-
484
- # Main content area
485
- col1, col2 = st.columns([2, 3]) # Adjust column ratio as needed
486
 
487
- with col1:
488
- st.subheader("Video Feed")
489
- if st.session_state.get('run_loop', False) and st.session_state.get('current_frame') is not None:
490
- st.image(st.session_state['current_frame'], caption="Live Feed" if video_mode != "none" else "Video Disabled", use_column_width=True)
491
- elif video_mode != "none":
492
- st.info("Video feed will appear here when the session starts.")
 
 
 
493
  else:
494
- st.info("Video input is disabled.")
495
 
496
- with col2:
497
- st.subheader("Chat with Medical Assistant")
498
- chat_container = st.container() # For scrolling chat
499
- with chat_container:
500
- for msg in st.session_state.chat_messages:
501
- with st.chat_message(msg["role"]):
502
- st.write(msg["content"])
503
 
504
- prompt = st.chat_input("Ask about what you're showing...", key="chat_input_box", disabled=not st.session_state.get('run_loop', False))
505
- if prompt:
506
- st.session_state.chat_messages.append({"role": "user", "content": prompt})
507
- st.rerun() # Show user message immediately
508
-
509
- if st.session_state.audio_loop:
510
- # The text needs to be sent from within the asyncio loop or by scheduling it.
511
- # A simple way is to call a method on audio_loop that uses asyncio.create_task or similar.
512
- # For direct call from thread to asyncio loop, ensure it's thread-safe.
513
- # A better way is to put the text into a queue that send_text_to_gemini reads from,
514
- # or use asyncio.run_coroutine_threadsafe if the loop is known.
515
-
516
- # Current send_text_to_gemini is an async method.
517
- # We need to run it in the event loop of the audio_loop's thread.
518
- loop = asyncio.get_event_loop_policy().get_event_loop() # Get current thread's loop (might not be the one)
519
- if st.session_state.audio_loop.session: # Ensure session exists
520
- # This is a simplified approach; proper thread-safe coroutine scheduling is more robust.
521
- # Consider using asyncio.run_coroutine_threadsafe if audio_loop.run() exposes its loop.
522
- asyncio.run(st.session_state.audio_loop.send_text_to_gemini(prompt))
523
  else:
524
- st.error("Session not fully active to send message.")
 
 
525
  else:
526
- st.error("Session is not active. Please start a session.")
527
- # Rerun after processing to show potential AI response (if text part comes quickly)
528
- # st.rerun() # This might be too frequent, rely on receive_gemini_responses to update chat
529
-
530
 
531
  if __name__ == "__main__":
532
- # Global PyAudio termination hook (optional, for very clean shutdowns)
533
- # def cleanup_pyaudio():
534
- # print("Terminating PyAudio globally.")
535
- # pya.terminate()
536
- # import atexit
537
- # atexit.register(cleanup_pyaudio)
538
- main()
 
5
  import base64
6
  import io
7
  import threading
 
8
  import traceback
9
+ import time # For potential delays or timestamps if needed in future
10
+ import atexit # For cleanup actions on exit
11
+
12
  from dotenv import load_dotenv
13
 
14
  # --- Import main libraries ---
15
  import cv2
16
  import pyaudio
17
  import PIL.Image
18
+ import mss # For screen capture
19
 
20
  from google import genai
21
  from google.genai import types
22
 
23
  # --- Configuration ---
24
+ load_dotenv() # Load environment variables from .env file
25
 
26
  # Audio configuration
27
  FORMAT = pyaudio.paInt16
28
  CHANNELS = 1
29
  SEND_SAMPLE_RATE = 16000
30
+ RECEIVE_SAMPLE_RATE = 24000 # Gemini documentation recommendation
31
  CHUNK_SIZE = 1024
32
+ AUDIO_PLAYBACK_QUEUE_MAXSIZE = 50 # Buffer for audio chunks from Gemini for playback
33
+ MEDIA_OUT_QUEUE_MAXSIZE = 20 # Buffer for audio/video frames to send to Gemini
34
 
35
  # Video configuration
36
  VIDEO_FPS_LIMIT = 1 # Send 1 frame per second to the API
37
+ VIDEO_PREVIEW_RESIZE = (640, 480) # Size for Streamlit preview display
38
+ VIDEO_API_RESIZE = (1024, 1024) # Max size for images sent to API (aspect ratio preserved)
39
 
40
  # Gemini model configuration
41
+ MODEL_NAME = "models/gemini-2.0-flash-live-001" # VERIFY THIS MODEL NAME IS CORRECT FOR LIVE API
42
+ DEFAULT_VIDEO_MODE = "camera" # Default video input mode ("camera", "screen", "none")
43
 
44
  # System Prompt for the Medical Assistant
45
+ MEDICAL_ASSISTANT_SYSTEM_PROMPT = """You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
46
 
47
  Your responsibilities are:
48
  1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe.
 
58
  Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
59
  """
60
 
61
+ # Initialize PyAudio instance globally
62
+ pya = pyaudio.PyAudio()
63
+
64
+ # Ensure PyAudio is terminated on exit
65
+ def cleanup_pyaudio():
66
+ print("Terminating PyAudio instance.")
67
+ pya.terminate()
68
+ atexit.register(cleanup_pyaudio)
69
+
70
+
71
+ # Initialize Streamlit session state variables
72
  def init_session_state():
73
+ defaults = {
74
+ 'app_initialized': True, # To ensure this runs only once per session
75
+ 'session_active': False,
76
+ 'audio_loop_instance': None,
77
+ 'chat_messages': [],
78
+ 'current_frame_preview': None,
79
+ 'video_mode_selection': DEFAULT_VIDEO_MODE,
80
+ }
81
+ for key, value in defaults.items():
82
+ if key not in st.session_state:
83
+ st.session_state[key] = value
84
+
85
  init_session_state()
86
 
87
+
88
+ # Configure Streamlit page
89
+ st.set_page_config(page_title="Live Medical Assistant", layout="wide")
90
 
91
  # Initialize Gemini client
 
92
  GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
93
  if not GEMINI_API_KEY:
94
  st.error("GEMINI_API_KEY not found. Please set it in your environment variables or a .env file.")
95
  st.stop()
96
 
97
+ # Use 'client' consistently for the genai.Client instance
98
+ client = None # Define client in the global scope
99
+ try:
100
+ client = genai.Client(
101
+ http_options={"api_version": "v1beta"}, # Required for live
102
+ api_key=GEMINI_API_KEY,
103
+ )
104
+ except Exception as e:
105
+ st.error(f"Failed to initialize Gemini client: {e}")
106
+ st.stop()
107
 
108
+ # Gemini LiveConnectConfig
109
+ LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
110
+ response_modalities=["audio", "text"], # Expect both audio and text responses
111
  speech_config=types.SpeechConfig(
112
  voice_config=types.VoiceConfig(
113
+ prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr") # Changed voice to Zephyr
114
  )
115
  ),
 
 
116
  )
117
 
118
+ class AudioVisualLoop:
119
+ def __init__(self, video_mode_setting=DEFAULT_VIDEO_MODE):
120
+ self.video_mode = video_mode_setting
121
+ self.gemini_session = None
122
+ self.async_event_loop = None # To store the loop for thread-safe calls
123
+
124
+ self.audio_playback_queue = None # asyncio.Queue for audio from Gemini
125
+ self.media_to_gemini_queue = None # asyncio.Queue for audio/video to Gemini
126
+
127
+ self.is_running = True # Flag to control all async tasks
128
+ self.mic_stream = None # PyAudio input stream
129
+ self.playback_stream = None # PyAudio output stream
130
+ self.camera_capture = None # OpenCV VideoCapture object
131
 
132
+ async def send_text_input_to_gemini(self, user_text):
133
+ if not user_text or not self.gemini_session or not self.is_running:
134
+ print("Warning: Cannot send text. Session not active or no text.")
 
 
 
 
 
 
 
 
 
 
135
  return
136
  try:
137
+ print(f"Sending text to Gemini: {user_text}")
138
+ await self.gemini_session.send(input=user_text, end_of_turn=True)
 
139
  except Exception as e:
140
+ st.error(f"Error sending text message to Gemini: {e}")
141
+ print(f"Traceback for send_text_input_to_gemini: {traceback.format_exc()}")
142
 
143
+ def _process_camera_frame(self):
144
+ if not self.camera_capture or not self.camera_capture.isOpened():
145
+ print("Camera not available or not open.")
146
+ return None
147
+ ret, frame = self.camera_capture.read()
148
  if not ret:
149
+ print("Failed to read frame from camera.")
150
  return None
151
 
152
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
 
156
  preview_img.thumbnail(VIDEO_PREVIEW_RESIZE)
157
 
158
  api_img = img.copy()
159
+ api_img.thumbnail(VIDEO_API_RESIZE) # Preserves aspect ratio
160
 
161
  image_io = io.BytesIO()
162
  api_img.save(image_io, format="jpeg")
163
+ image_bytes = image_io.getvalue()
 
164
 
165
  return {
166
  "preview": preview_img,
167
+ "api_data": {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
 
 
 
168
  }
169
 
170
+ async def stream_camera_frames(self):
 
171
  try:
172
+ self.camera_capture = await asyncio.to_thread(cv2.VideoCapture, 0) # 0 for default camera
173
+ if not self.camera_capture.isOpened():
174
+ st.error("Could not open camera. Please check permissions and availability.")
175
+ self.is_running = False
176
  return
177
 
178
+ while self.is_running:
179
+ frame_data = await asyncio.to_thread(self._process_camera_frame)
180
+ if frame_data:
181
+ st.session_state['current_frame_preview'] = frame_data["preview"]
182
+ if self.media_to_gemini_queue.full():
183
+ await self.media_to_gemini_queue.get() # Make space
184
+ await self.media_to_gemini_queue.put(frame_data["api_data"])
 
 
 
 
 
185
  await asyncio.sleep(1.0 / VIDEO_FPS_LIMIT)
186
  except Exception as e:
187
+ # st.error(f"Camera streaming error: {e}") # Avoid st.error from non-main thread if problematic
188
+ print(f"Camera streaming error: {e}\nTraceback: {traceback.format_exc()}")
189
+ self.is_running = False
190
  finally:
191
+ if self.camera_capture:
192
+ await asyncio.to_thread(self.camera_capture.release)
193
+ self.camera_capture = None
194
+ print("Camera streaming task finished.")
195
+
196
+ def _process_screen_frame(self):
197
+ with mss.mss() as sct:
198
+ monitor_index = 1
199
+ if len(sct.monitors) <= monitor_index:
200
+ monitor_index = 0
201
+ monitor = sct.monitors[monitor_index]
202
+
203
+ sct_img = sct.grab(monitor)
204
+ img = PIL.Image.frombytes("RGB", (sct_img.width, sct_img.height), sct_img.rgb)
205
+
206
+ preview_img = img.copy()
207
+ preview_img.thumbnail(VIDEO_PREVIEW_RESIZE)
208
+
209
+ api_img = img.copy()
210
+ api_img.thumbnail(VIDEO_API_RESIZE)
211
+
212
+ image_io = io.BytesIO()
213
+ api_img.save(image_io, format="jpeg")
214
+ image_bytes = image_io.getvalue()
215
+
216
+ return {
217
+ "preview": preview_img,
218
+ "api_data": {"mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode()}
 
 
 
 
219
  }
 
220
 
221
+ async def stream_screen_frames(self):
222
  try:
223
+ while self.is_running:
224
+ frame_data = await asyncio.to_thread(self._process_screen_frame)
225
+ if frame_data:
226
+ st.session_state['current_frame_preview'] = frame_data["preview"]
227
+ if self.media_to_gemini_queue.full():
228
+ await self.media_to_gemini_queue.get()
229
+ await self.media_to_gemini_queue.put(frame_data["api_data"])
 
 
 
 
 
230
  await asyncio.sleep(1.0 / VIDEO_FPS_LIMIT)
231
  except Exception as e:
232
+ # st.error(f"Screen capture error: {e}")
233
+ print(f"Screen capture error: {e}\nTraceback: {traceback.format_exc()}")
234
+ self.is_running = False
235
+ finally:
236
+ print("Screen streaming task finished.")
237
 
238
+ async def stream_media_to_gemini(self):
239
  try:
240
+ while self.is_running:
241
+ if not self.gemini_session:
242
+ await asyncio.sleep(0.1)
243
  continue
244
  try:
245
+ media_chunk = await asyncio.wait_for(self.media_to_gemini_queue.get(), timeout=1.0)
246
+ if media_chunk and self.gemini_session and self.is_running:
247
+ await self.gemini_session.send(input=media_chunk)
248
+ if media_chunk: # Avoid task_done on None if used as sentinel
249
+ self.media_to_gemini_queue.task_done()
250
  except asyncio.TimeoutError:
251
+ continue
252
  except Exception as e:
253
+ if self.is_running:
254
+ print(f"Error in stream_media_to_gemini: {e}")
255
+ await asyncio.sleep(0.1)
 
256
  except asyncio.CancelledError:
257
+ print("stream_media_to_gemini task cancelled.")
258
  finally:
259
+ print("Media streaming to Gemini task finished.")
260
 
261
+ async def capture_microphone_audio(self):
 
 
262
  try:
263
  mic_info = await asyncio.to_thread(pya.get_default_input_device_info)
264
+ self.mic_stream = await asyncio.to_thread(
265
  pya.open,
266
+ format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE,
267
+ input=True, input_device_index=mic_info["index"],
 
 
 
268
  frames_per_buffer=CHUNK_SIZE,
269
  )
270
  print("Microphone stream opened.")
271
+ while self.is_running:
272
  try:
273
+ audio_data = await asyncio.to_thread(self.mic_stream.read, CHUNK_SIZE, exception_on_overflow=False)
274
+ if self.media_to_gemini_queue.full():
275
+ await self.media_to_gemini_queue.get()
276
+ await self.media_to_gemini_queue.put({"data": audio_data, "mime_type": "audio/pcm"})
277
+ except IOError as e:
278
+ if e.errno == pyaudio.paInputOverflowed: # type: ignore
279
+ print("Microphone Input overflowed. Skipping.")
 
280
  else:
281
+ print(f"Microphone read IOError: {e}")
282
+ self.is_running = False; break
 
283
  except Exception as e:
284
+ print(f"Error in capture_microphone_audio: {e}")
285
+ await asyncio.sleep(0.01)
286
  except Exception as e:
287
+ # st.error(f"Failed to open microphone: {e}. Please check permissions.")
288
+ print(f"Failed to open microphone: {e}\nTraceback: {traceback.format_exc()}")
289
+ self.is_running = False
290
  finally:
291
+ if self.mic_stream:
292
+ await asyncio.to_thread(self.mic_stream.stop_stream)
293
+ await asyncio.to_thread(self.mic_stream.close)
294
+ self.mic_stream = None
295
+ print("Microphone capture task finished.")
296
 
297
+ async def process_gemini_responses(self):
 
298
  try:
299
+ while self.is_running:
300
+ if not self.gemini_session:
301
+ await asyncio.sleep(0.1); continue
 
302
  try:
303
+ turn_response = self.gemini_session.receive()
304
+ async for chunk in turn_response:
305
+ if not self.is_running: break
306
+ if audio_data := chunk.data:
307
+ if not self.audio_playback_queue.full():
308
+ self.audio_playback_queue.put_nowait(audio_data)
 
309
  else:
310
+ print("Audio playback queue full, discarding data.")
311
+ if text_response := chunk.text:
312
+ # Schedule Streamlit update from the main thread if possible,
313
+ # or use st.session_state and rely on rerun.
314
+ st.session_state['chat_messages'] = st.session_state['chat_messages'] + [{"role": "assistant", "content": text_response}]
315
+ # If immediate UI update is needed and safe:
316
+ # st.experimental_rerun() # Use with caution from background threads
 
 
 
 
317
  except types.generation_types.StopCandidateException:
318
+ print("Gemini response stream ended (StopCandidateException).")
319
  except Exception as e:
320
+ if self.is_running:
321
  print(f"Error receiving from Gemini: {e}")
322
+ await asyncio.sleep(0.1)
323
  except asyncio.CancelledError:
324
+ print("process_gemini_responses task cancelled.")
325
  finally:
326
+ print("Gemini response processing task finished.")
 
327
 
328
+ async def play_gemini_audio(self):
 
329
  try:
330
+ self.playback_stream = await asyncio.to_thread(
331
+ pya.open, format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE, output=True
 
 
 
 
332
  )
333
  print("Audio playback stream opened.")
334
+ while self.is_running:
335
  try:
336
+ audio_chunk = await asyncio.wait_for(self.audio_playback_queue.get(), timeout=1.0)
337
+ if audio_chunk: # Check if not None (sentinel)
338
+ await asyncio.to_thread(self.playback_stream.write, audio_chunk)
339
+ if audio_chunk: # Avoid task_done on None
340
+ self.audio_playback_queue.task_done()
341
  except asyncio.TimeoutError:
342
+ continue
343
  except Exception as e:
344
  print(f"Error playing audio: {e}")
345
+ await asyncio.sleep(0.01)
346
  except Exception as e:
347
+ # st.error(f"Failed to open audio playback stream: {e}")
348
+ print(f"Failed to open audio playback stream: {e}\nTraceback: {traceback.format_exc()}")
349
+ self.is_running = False
350
  finally:
351
+ if self.playback_stream:
352
+ await asyncio.to_thread(self.playback_stream.stop_stream)
353
+ await asyncio.to_thread(self.playback_stream.close)
354
+ self.playback_stream = None
355
+ print("Audio playback task finished.")
356
+
357
+ def signal_stop(self):
358
+ print("Signal to stop AudioVisualLoop received.")
359
+ self.is_running = False
360
+ if self.media_to_gemini_queue: self.media_to_gemini_queue.put_nowait(None)
361
+ if self.audio_playback_queue: self.audio_playback_queue.put_nowait(None)
362
+
363
+ async def run_main_loop(self):
364
+ self.async_event_loop = asyncio.get_running_loop()
365
+ self.is_running = True
366
+ # st.session_state['session_active'] = True # Set by caller in Streamlit thread
367
+
368
+ print("AudioVisualLoop starting...")
369
+ # Ensure client is not None before using (should be handled by global init)
370
+ if client is None:
371
+ print("Error: Gemini client is not initialized.")
372
+ st.error("Critical Error: Gemini client failed to initialize. Cannot start session.")
373
+ st.session_state['session_active'] = False
374
+ return
375
+
376
  try:
377
+ # Use the global 'client' instance
378
+ async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
379
+ self.gemini_session = session
380
  print("Gemini session established.")
381
 
 
382
  try:
383
  print("Sending system prompt to Gemini...")
384
+ await self.gemini_session.send(input=MEDICAL_ASSISTANT_SYSTEM_PROMPT, end_of_turn=False)
385
+ print("System prompt sent successfully.")
 
386
  except Exception as e:
387
+ # st.error(f"Failed to send system prompt to Gemini: {e}")
388
+ print(f"Failed to send system prompt to Gemini: {e}\nTraceback: {traceback.format_exc()}")
389
+ self.is_running = False
390
+ return
391
 
392
+ self.audio_playback_queue = asyncio.Queue(maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
393
+ self.media_to_gemini_queue = asyncio.Queue(maxsize=MEDIA_OUT_QUEUE_MAXSIZE)
 
394
 
395
  async with asyncio.TaskGroup() as tg:
396
+ print("Creating async tasks...")
397
+ tg.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
398
+ tg.create_task(self.capture_microphone_audio(), name="capture_microphone_audio")
 
399
 
400
  if self.video_mode == "camera":
401
+ tg.create_task(self.stream_camera_frames(), name="stream_camera_frames")
402
  elif self.video_mode == "screen":
403
+ tg.create_task(self.stream_screen_frames(), name="stream_screen_frames")
 
404
 
405
+ tg.create_task(self.process_gemini_responses(), name="process_gemini_responses")
406
+ tg.create_task(self.play_gemini_audio(), name="play_gemini_audio")
407
+ print("All async tasks created in TaskGroup.")
408
 
409
+ print("TaskGroup finished execution.")
 
 
410
 
411
  except asyncio.CancelledError:
412
+ print("AudioVisualLoop.run_main_loop() was cancelled.")
413
+ except ExceptionGroup as eg:
414
+ # st.error(f"An error occurred in one of the concurrent tasks: {eg.exceptions[0]}")
415
+ print(f"ExceptionGroup caught in AudioVisualLoop: {eg}")
416
  for i, exc in enumerate(eg.exceptions):
417
+ print(f" Task Exception {i+1}/{len(eg.exceptions)}: {type(exc).__name__}: {exc}")
418
  traceback.print_exception(type(exc), exc, exc.__traceback__)
419
+ except Exception as e: # This catches the ConnectionClosedError
420
+ # st.error(f"A critical error occurred in the main session loop: {e}")
421
+ print(f"General Exception in AudioVisualLoop: {type(e).__name__}: {e}")
422
+ traceback.print_exception(e) # Print full traceback for debugging
423
  finally:
424
+ print("AudioVisualLoop.run_main_loop() finishing...")
425
+ self.is_running = False
426
+ # st.session_state['session_active'] = False # Set by caller in Streamlit thread
427
+ self.gemini_session = None
428
+ print("AudioVisualLoop finished.")
 
 
429
 
430
 
431
+ # --- Streamlit UI ---
432
+ def run_streamlit_app():
433
+ st.title("Live AI Medical Assistant")
434
 
435
  with st.sidebar:
436
+ st.header("Session Control")
437
  video_mode_options = ["camera", "screen", "none"]
438
+ current_video_mode = st.session_state.get('video_mode_selection', DEFAULT_VIDEO_MODE)
439
+ selected_video_mode = st.selectbox(
440
+ "Video Source:",
441
+ video_mode_options,
442
+ index=video_mode_options.index(current_video_mode),
443
+ disabled=st.session_state.get('session_active', False)
444
+ )
445
+ if selected_video_mode != current_video_mode: # Update if changed and not active
446
+ st.session_state['video_mode_selection'] = selected_video_mode
447
+
448
+ if not st.session_state.get('session_active', False):
449
+ if st.button("πŸš€ Start Session", type="primary", use_container_width=True):
450
+ st.session_state['session_active'] = True # Set active before starting thread
451
+ st.session_state['chat_messages'] = [{
452
  "role": "system",
453
  "content": (
454
+ "Medical Assistant is activating. The AI has been instructed on its role to visually assist you. "
455
+ "Remember: This AI cannot provide medical diagnoses or replace consultation with a healthcare professional."
456
  )
457
  }]
458
+ st.session_state['current_frame_preview'] = None
459
 
460
+ audio_loop_instance = AudioVisualLoop(video_mode_setting=st.session_state['video_mode_selection'])
461
+ st.session_state['audio_loop_instance'] = audio_loop_instance
462
 
463
+ threading.Thread(target=lambda: asyncio.run(audio_loop_instance.run_main_loop()), daemon=True).start()
464
+ st.success("Session starting... Please wait for initialization.")
465
+ time.sleep(1)
466
+ st.rerun()
467
+ else:
468
+ if st.button("πŸ›‘ Stop Session", type="secondary", use_container_width=True):
469
+ if st.session_state.get('audio_loop_instance'):
470
+ st.session_state['audio_loop_instance'].signal_stop()
471
+ st.session_state['audio_loop_instance'] = None
472
+ st.session_state['session_active'] = False # Set inactive
473
+ st.warning("Session stopping... Please wait.")
474
+ time.sleep(1)
475
+ st.rerun()
476
+
477
+ col_video, col_chat = st.columns([2, 3])
 
 
478
 
479
+ with col_video:
480
+ st.subheader("Live Feed")
481
+ if st.session_state.get('session_active', False) and st.session_state.get('video_mode_selection', DEFAULT_VIDEO_MODE) != "none":
482
+ if st.session_state.get('current_frame_preview') is not None:
483
+ st.image(st.session_state['current_frame_preview'], caption="Live Video Feed", use_column_width=True)
484
+ else:
485
+ st.info("Waiting for video feed...")
486
+ elif st.session_state.get('video_mode_selection', DEFAULT_VIDEO_MODE) != "none":
487
+ st.info("Video feed will appear here once the session starts.")
488
  else:
489
+ st.info("Video input is disabled for this session.")
490
 
491
+ with col_chat:
492
+ st.subheader("Chat with Assistant")
493
+ # Display chat messages from session state
494
+ for msg in st.session_state.get('chat_messages', []):
495
+ with st.chat_message(msg["role"]):
496
+ st.write(msg["content"])
 
497
 
498
+ user_chat_input = st.chat_input(
499
+ "Type your message or ask about the video...",
500
+ key="user_chat_input_box",
501
+ disabled=not st.session_state.get('session_active', False)
502
+ )
503
+
504
+ if user_chat_input:
505
+ # Append user message and rerun to display it immediately
506
+ st.session_state['chat_messages'] = st.session_state.get('chat_messages', []) + [{"role": "user", "content": user_chat_input}]
507
+
508
+ loop_instance = st.session_state.get('audio_loop_instance')
509
+ if loop_instance and loop_instance.async_event_loop and loop_instance.gemini_session:
510
+ if loop_instance.async_event_loop.is_running():
511
+ asyncio.run_coroutine_threadsafe(
512
+ loop_instance.send_text_input_to_gemini(user_chat_input),
513
+ loop_instance.async_event_loop
514
+ )
 
 
515
  else:
516
+ st.error("Session event loop is not running. Cannot send message.")
517
+ elif not loop_instance or not st.session_state.get('session_active', False):
518
+ st.error("Session is not active. Please start a session to send messages.")
519
  else:
520
+ st.warning("Session components not fully ready. Please wait a moment.")
521
+ st.rerun() # Rerun to show user message and any quick AI text response
 
 
522
 
523
  if __name__ == "__main__":
524
+ run_streamlit_app()