noumanjavaid commited on
Commit
7630a47
·
verified ·
1 Parent(s): 4afb504

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +298 -660
app.py CHANGED
@@ -1,69 +1,25 @@
1
- # -*- coding: utf-8 -*-
2
- import streamlit as st
3
  import os
 
4
  import asyncio
5
- import base64
6
- import io
7
- import threading
8
- import traceback
9
- import atexit
10
- import time
11
  import logging
12
- from dotenv import load_dotenv
13
-
 
14
  import cv2
15
- import pyaudio
16
- import PIL.Image
17
-
18
- # Import websockets for explicit exception handling
19
- import websockets.exceptions
20
-
21
- from google import genai
22
- from google.genai import types
23
  from google.genai.types import Content, Part
 
 
 
 
24
 
25
- from streamlit_webrtc import (
26
- webrtc_streamer,
27
- WebRtcMode,
28
- AudioProcessorBase,
29
- VideoProcessorBase,
30
- )
31
-
32
- load_dotenv()
33
-
34
-
35
- # Audio configuration - fix audio format issues
36
- FORMAT = pyaudio.paInt16
37
- CHANNELS = 1
38
- SEND_SAMPLE_RATE = 16000 # Changed to match mime_type for consistency
39
- RECEIVE_SAMPLE_RATE = 16000 # Changed from 24000 to 16000 to match send rate
40
- CHUNK_SIZE = 1024
41
-
42
- # Map PyAudio format to a more descriptive name for clarity.
43
- PYAUDIO_FORMAT = FORMAT # pyaudio.paInt16
44
- PYAUDIO_CHANNELS = CHANNELS
45
- PYAUDIO_PLAYBACK_CHUNK_SIZE = CHUNK_SIZE
46
- GEMINI_AUDIO_RECEIVE_SAMPLE_RATE = RECEIVE_SAMPLE_RATE
47
-
48
- # Video configuration
49
- VIDEO_FPS_TO_GEMINI = 1 # Reduced from 2 to lower bandwidth
50
- VIDEO_API_RESIZE = (512, 512) # Reduced from 1024x1024 to lower payload size
51
- MAX_PAYLOAD_SIZE_BYTES = 60000 # Just under 64KB WebSocket limit
52
-
53
- # Queue sizes
54
- MEDIA_TO_GEMINI_QUEUE_MAXSIZE = 10
55
- AUDIO_PLAYBACK_QUEUE_MAXSIZE = 10
56
-
57
- # WebRTC settings
58
- WEBRTC_REQUESTED_SEND_SAMPLE_RATE = SEND_SAMPLE_RATE
59
- WEBRTC_REQUESTED_AUDIO_CHANNELS = CHANNELS
60
 
61
-
62
- # !!! IMPORTANT: Verify this model name is correct for the Live API !!!
63
- MODEL_NAME = "models/gemini-2.0-flash-live-001"
64
- logging.info(f"Using Gemini Model: {MODEL_NAME}")
65
-
66
- MEDICAL_ASSISTANT_SYSTEM_PROMPT = """You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
67
  Your responsibilities are:
68
  1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe.
69
  2. **General Information (Non-Diagnostic):** Provide general information related to what is visually presented, if applicable. You are not a diagnostic tool.
@@ -75,641 +31,323 @@ Your responsibilities are:
75
  4. **Tone:** Maintain a helpful, empathetic, and calm tone.
76
  5. **Interaction:** After this initial instruction, you can make a brief acknowledgment of your role (e.g., "I'm ready to assist by looking at what you show me. Please remember to consult a doctor for medical advice."). Then, focus on responding to the user's visual input and questions.
77
  Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
78
- """
79
-
80
- # --- PyAudio Global Instance and Cleanup ---
81
- pya = None
82
- try:
83
- pya = pyaudio.PyAudio()
84
-
85
- def cleanup_pyaudio():
86
- logging.info("Terminating PyAudio instance.")
87
- if pya:
88
- pya.terminate()
89
- atexit.register(cleanup_pyaudio)
90
- logging.info("PyAudio initialized successfully.")
91
- except Exception as e_pyaudio:
92
- logging.warning(
93
- f"PyAudio initialization failed (expected in some server environments): {e_pyaudio}")
94
- pya = None
95
-
96
- # --- Global Queues - Declare as None, initialize later ---
97
- video_frames_to_gemini_q: asyncio.Queue = None
98
- audio_chunks_to_gemini_q: asyncio.Queue = None
99
- audio_from_gemini_playback_q: asyncio.Queue = None
100
-
101
- # --- Gemini Client Setup ---
102
- # Try to get API key from environment or use a manually provided one
103
- def initialize_gemini_client():
104
- # Check for API key in various places
105
- api_key = os.environ.get("GEMINI_API_KEY")
106
-
107
- # Look for .env file (original or new)
108
- if not api_key:
109
- # Hardcoded API key from the user's message as fallback
110
- api_key = "AIzaSyBy5-l1xR1FN78jQB-MbJhQbRzq-ruoXuI"
111
-
112
- # Try reading from .env.new which we know exists and has permissions
113
- env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env.new")
114
- try:
115
- if os.path.exists(env_file):
116
- with open(env_file, "r") as f:
117
- for line in f:
118
- if line.startswith("GEMINI_API_KEY="):
119
- api_key = line.strip().split("=", 1)[1]
120
- # Remove quotes if present
121
- api_key = api_key.strip('\'"')
122
- break
123
- except (PermissionError, IOError) as e:
124
- logging.warning(f"Could not read {env_file}: {e}")
125
- # Continue with the hardcoded key
126
-
127
- # Initialize client with the API key
128
- if api_key:
129
- try:
130
- client = genai.Client(http_options={"api_version": "v1beta"}, api_key=api_key)
131
- logging.info("Gemini client initialized successfully.")
132
- return client
133
- except Exception as e:
134
- logging.critical(f"Gemini client initialization failed: {e}", exc_info=True)
135
- return None
136
- else:
137
- logging.critical("GEMINI_API_KEY not found.")
138
- return None
139
-
140
- client = initialize_gemini_client()
141
-
142
- # Configure the Gemini Live connection with proper settings
143
- LIVE_CONNECT_CONFIG = types.LiveConnectConfig(
144
- response_modalities=["audio"], # Only requesting audio and text responses
145
- speech_config=types.SpeechConfig(
146
- voice_config=types.VoiceConfig(
147
- prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
148
- )
149
- )
150
- )
151
- logging.info(f"Attempting connection with LiveConnectConfig: {LIVE_CONNECT_CONFIG}")
152
-
153
 
154
- # --- Backend Gemini Interaction Loop ---
155
  class GeminiInteractionLoop:
156
- def __init__(self):
 
 
 
 
 
 
 
 
 
157
  self.gemini_session = None
158
- self.async_event_loop = None
159
- self.is_running = True
160
- self.playback_stream = None
161
-
162
- async def send_text_input_to_gemini(self, user_text):
163
- if not user_text or not self.gemini_session or not self.is_running:
164
- logging.warning(
165
- "Cannot send text. Session not active, no text, or not running.")
166
- return
167
- try:
168
- logging.info(f"Sending text to Gemini: '{user_text[:50]}...'")
169
- # Use send_client_content as specified in the error message
170
- content = Content(parts=[Part(text=user_text)])
171
- await self.gemini_session.send_client_content(content)
172
- except Exception as e:
173
- logging.error(
174
- f"Error sending text message to Gemini: {e}", exc_info=True)
175
-
176
- # Helper function to validate and possibly resize media data
177
- def _validate_media_payload(self, media_data):
178
- """Validate and potentially reduce size of media payload"""
179
- if not isinstance(media_data, dict):
180
- logging.warning(f"Invalid media data type: {type(media_data)}")
181
- return None
182
-
183
- if not all(k in media_data for k in ["data", "mime_type"]):
184
- logging.warning(f"Media data missing required fields")
185
- return None
186
 
187
- # Handle audio data - ensure proper format for Gemini API
188
- if media_data["mime_type"].startswith("audio/"):
 
189
  try:
190
- # Ensure audio data is in bytes format
191
- if isinstance(media_data["data"], bytes):
192
- # No need to base64 encode binary audio data for Gemini API
193
- # Just ensure the mime_type is correctly formatted
194
- if "rate=" not in media_data["mime_type"]:
195
- # Default to 16kHz if not specified
196
- media_data["mime_type"] = f"audio/L16;rate=16000;channels=1"
197
-
198
- # Create a new dict to avoid modifying the original
199
- return {
200
- "mime_type": media_data["mime_type"],
201
- "data": media_data["data"]
202
- }
203
- else:
204
- logging.warning(f"Audio data is not in bytes format: {type(media_data['data'])}")
205
- return None
206
  except Exception as e:
207
- logging.error(f"Error processing audio data: {e}", exc_info=True)
208
- return None
209
-
210
- # Check if it's an image and needs resizing
211
- if media_data["mime_type"].startswith("image/"):
212
  try:
213
- data_size = len(media_data["data"])
214
- if data_size > MAX_PAYLOAD_SIZE_BYTES:
215
- logging.warning(f"Image payload too large ({data_size} bytes), reducing quality")
216
- # Decode base64 image
217
- img_bytes = base64.b64decode(media_data["data"])
218
- img = PIL.Image.open(io.BytesIO(img_bytes))
219
-
220
- # Try lower quality JPEG
221
- buffer = io.BytesIO()
222
- img.save(buffer, format="JPEG", quality=70)
223
- buffer.seek(0)
224
- smaller_bytes = buffer.getvalue()
225
-
226
- # Update the data with reduced size image
227
- media_data["data"] = base64.b64encode(smaller_bytes).decode()
228
  except Exception as e:
229
- logging.error(f"Error resizing image: {e}", exc_info=True)
230
-
231
- return media_data
232
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
  async def stream_media_to_gemini(self):
234
- logging.info("Task started: Stream media from WebRTC queues to Gemini.")
235
-
236
- async def get_media_from_queues():
237
- if video_frames_to_gemini_q is None or audio_chunks_to_gemini_q is None:
238
- await asyncio.sleep(0.1)
239
- return None
240
- try:
241
- video_frame = await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.02)
242
- if video_frame is None:
243
- return None # Sentinel received
244
- video_frames_to_gemini_q.task_done()
245
- return video_frame
246
- except asyncio.TimeoutError:
247
- pass
248
- except Exception as e:
249
- logging.error(f"Error getting video from queue: {e}", exc_info=True)
250
- try:
251
- audio_chunk = await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.02)
252
- if audio_chunk is None:
253
- return None # Sentinel received
254
- audio_chunks_to_gemini_q.task_done()
255
- return audio_chunk
256
- except asyncio.TimeoutError:
257
- return None
258
- except Exception as e:
259
- logging.error(f"Error getting audio from queue: {e}", exc_info=True)
260
- return None
261
-
262
  try:
 
 
 
263
  while self.is_running:
264
- if not self.gemini_session:
265
- await asyncio.sleep(0.1)
266
- continue
267
- media_data = await get_media_from_queues()
268
- if media_data is None and not self.is_running:
269
- break # Sentinel and stop signal
270
-
271
- if media_data and self.gemini_session and self.is_running:
272
- try:
273
- validated_media = self._validate_media_payload(media_data)
274
- if validated_media:
275
- # Log media type and size before sending
276
- data_size = len(validated_media.get('data', b'')) if isinstance(validated_media.get('data'), bytes) else len(validated_media.get('data', ''))
277
- logging.debug(f"Sending media to Gemini. Type: {validated_media.get('mime_type')}, Data size: {data_size} bytes")
278
-
279
- # Ensure we're not exceeding WebSocket payload limits
280
- if data_size > MAX_PAYLOAD_SIZE_BYTES:
281
- logging.warning(f"Media payload exceeds maximum size ({data_size} > {MAX_PAYLOAD_SIZE_BYTES}), skipping")
282
- continue
283
-
284
- # Send the validated media to Gemini
285
- await self.gemini_session.send(input=validated_media)
286
- else:
287
- # Log if validation failed, but only if media_data was not None initially
288
- if media_data is not None:
289
- logging.warning(f"Media validation failed for payload. Type: {media_data.get('mime_type') if isinstance(media_data, dict) else type(media_data)}, skipping send.")
290
- except websockets.exceptions.ConnectionClosedError as e_conn_closed:
291
- error_code = getattr(e_conn_closed, 'code', None)
292
- error_reason = getattr(e_conn_closed, 'reason', 'Unknown reason')
293
- logging.error(f"WebSocket connection closed with code {error_code}: {error_reason}")
294
- logging.error(f"Connection closed while sending media: {e_conn_closed}", exc_info=True)
295
-
296
- # If we get a 1007 error (invalid frame payload data), log more details
297
- if error_code == 1007:
298
- logging.error(f"Invalid frame payload data error. This is likely due to malformed media data.")
299
- if isinstance(media_data, dict):
300
- logging.error(f"Media type: {media_data.get('mime_type', 'unknown')}, Data type: {type(media_data.get('data', None))}")
301
 
302
- # Stop the interaction loop if connection is lost
303
- self.is_running = False
304
- except Exception as e:
305
- logging.error(f"Error sending media chunk to Gemini: {e}", exc_info=True)
306
- elif not media_data: # media_data could be None if queues were empty and timed out
307
- await asyncio.sleep(0.05) # Yield to other tasks if no media
308
- except asyncio.CancelledError:
309
- logging.info("Task cancelled: stream_media_to_gemini.")
310
- finally:
311
- logging.info("Task finished: stream_media_to_gemini.")
312
-
313
- async def process_gemini_responses(self):
314
- logging.info("Task started: Process responses from Gemini.")
315
- try:
316
- while self.is_running:
317
- if not self.gemini_session:
318
- await asyncio.sleep(0.1)
319
- continue
320
- if audio_from_gemini_playback_q is None:
321
- await asyncio.sleep(0.1)
322
- continue
323
- try:
324
- turn_response = self.gemini_session.receive()
325
- async for chunk in turn_response:
326
- if not self.is_running:
327
- break
328
- if audio_data := chunk.data:
329
- if not audio_from_gemini_playback_q.full():
330
- audio_from_gemini_playback_q.put_nowait(audio_data)
331
- else:
332
- logging.warning(
333
- "Audio playback queue full, discarding Gemini audio data.")
334
- if text_response := chunk.text:
335
- logging.info(f"Gemini text response: {text_response[:100]}")
336
- except types.generation_types.StopCandidateException:
337
- logging.info("Gemini response stream ended normally.")
338
- except Exception as e:
339
- if self.is_running:
340
- logging.error(
341
- f"Error receiving from Gemini: {e}", exc_info=True)
342
- await asyncio.sleep(0.1)
343
- except asyncio.CancelledError:
344
- logging.info("Task cancelled: process_gemini_responses.")
345
- finally:
346
- logging.info("Task finished: process_gemini_responses.")
347
-
348
- async def play_gemini_audio(self):
349
- logging.info("Task started: Play Gemini audio responses.")
350
- if pya is None:
351
- logging.warning(
352
- "PyAudio not available. Audio playback task will not run.")
353
  return
354
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
355
  try:
356
- while audio_from_gemini_playback_q is None and self.is_running:
357
- await asyncio.sleep(0.1)
358
- if not self.is_running:
359
- return
360
-
361
- self.playback_stream = await asyncio.to_thread(
362
- pya.open, format=PYAUDIO_FORMAT, channels=PYAUDIO_CHANNELS, rate=GEMINI_AUDIO_RECEIVE_SAMPLE_RATE, output=True, frames_per_buffer=PYAUDIO_PLAYBACK_CHUNK_SIZE
363
- )
364
- logging.info(
365
- f"PyAudio playback stream opened at {GEMINI_AUDIO_RECEIVE_SAMPLE_RATE} Hz.")
366
  while self.is_running:
367
- try:
368
- audio_chunk = await asyncio.wait_for(audio_from_gemini_playback_q.get(), timeout=1.0)
369
- if audio_chunk is None and not self.is_running:
370
- break # Sentinel and stop signal
371
- if audio_chunk:
372
- await asyncio.to_thread(self.playback_stream.write, audio_chunk)
373
- if audio_chunk:
374
- audio_from_gemini_playback_q.task_done()
375
- except asyncio.TimeoutError:
376
- continue
377
- except Exception as e:
378
- logging.error(f"Error playing audio chunk: {e}", exc_info=True)
379
- await asyncio.sleep(0.01)
380
  except Exception as e:
381
- logging.error(
382
- f"Failed to open or use PyAudio playback stream (might be expected in this environment): {e}", exc_info=True)
383
- finally:
384
- if self.playback_stream:
385
- logging.info("Stopping and closing PyAudio playback stream.")
386
- try:
387
- await asyncio.to_thread(self.playback_stream.stop_stream)
388
- await asyncio.to_thread(self.playback_stream.close)
389
- except Exception as e_close:
390
- logging.error(
391
- f"Error closing playback stream: {e_close}", exc_info=True)
392
- self.playback_stream = None
393
- logging.info("Task finished: play_gemini_audio.")
394
-
395
- def signal_stop(self):
396
- logging.info("Signal to stop GeminiInteractionLoop received.")
397
- self.is_running = False
398
- for q_name, q_obj_ref in [("video_q", video_frames_to_gemini_q),
399
- ("audio_in_q", audio_chunks_to_gemini_q),
400
- ("audio_out_q", audio_from_gemini_playback_q)]:
401
- if q_obj_ref:
402
- try:
403
- q_obj_ref.put_nowait(None)
404
- except asyncio.QueueFull:
405
- logging.warning(
406
- f"Queue {q_name} was full when trying to put sentinel for stop signal.")
407
- except Exception as e:
408
- logging.error(
409
- f"Error putting sentinel in {q_name}: {e}", exc_info=True)
410
-
411
- async def run_main_loop(self):
412
- global video_frames_to_gemini_q, audio_chunks_to_gemini_q, audio_from_gemini_playback_q
413
-
414
- self.async_event_loop = asyncio.get_running_loop()
415
- self.is_running = True
416
- logging.info("GeminiInteractionLoop run_main_loop starting...")
417
-
418
- video_frames_to_gemini_q = asyncio.Queue(
419
- maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
420
- audio_chunks_to_gemini_q = asyncio.Queue(
421
- maxsize=MEDIA_TO_GEMINI_QUEUE_MAXSIZE)
422
- audio_from_gemini_playback_q = asyncio.Queue(
423
- maxsize=AUDIO_PLAYBACK_QUEUE_MAXSIZE)
424
- logging.info("Asyncio queues initialized in GeminiInteractionLoop.")
425
-
426
- if client is None:
427
- logging.critical(
428
- "Gemini client is None in run_main_loop. Aborting.")
429
- return
430
-
431
  try:
432
- async with client.aio.live.connect(model=MODEL_NAME, config=LIVE_CONNECT_CONFIG) as session:
433
- self.gemini_session = session
434
- logging.info(
435
- f"Gemini session established with API for model {MODEL_NAME}.")
436
  try:
437
- logging.info("Sending system prompt to Gemini...")
438
- # Use send_client_content with proper format
439
- content = Content(parts=[Part(text=MEDICAL_ASSISTANT_SYSTEM_PROMPT)])
440
- await self.gemini_session.send_client_content(content)
441
- logging.info("System prompt sent successfully.")
 
 
442
  except Exception as e:
443
- logging.error(
444
- f"Failed to send system prompt: {e}", exc_info=True)
445
- self.is_running = False
446
- return
447
-
448
- tasks = []
449
- try:
450
- logging.info("Creating async tasks for Gemini interaction...")
451
- media_stream_task = asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
452
- response_process_task = asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses")
453
- audio_play_task = asyncio.create_task(self.play_gemini_audio(), name="play_gemini_audio")
454
- tasks = [media_stream_task, response_process_task, audio_play_task]
455
- logging.info("All Gemini interaction tasks created.")
456
-
457
- # Wait for all tasks to complete, collecting all results/exceptions
458
- results = await asyncio.gather(*tasks, return_exceptions=True)
459
-
460
- for i, result in enumerate(results):
461
- if isinstance(result, Exception):
462
- task_name = tasks[i].get_name() if hasattr(tasks[i], 'get_name') else f"Task-{i}"
463
- logging.error(f"Task '{task_name}' failed: {result}", exc_info=result)
464
- # If one task fails, we might want to signal others to stop.
465
- # self.signal_stop() # This is already called in finally, but could be earlier if needed.
466
- except asyncio.CancelledError:
467
- logging.info("One or more tasks were cancelled during gather.")
468
- except Exception as e_gather:
469
- logging.error(f"Error during task management with asyncio.gather: {e_gather}", exc_info=True)
470
- finally:
471
- # Ensure all tasks are cancelled if not already done, before main loop finally block
472
- for task in tasks:
473
- if task and not task.done():
474
- task.cancel()
475
- # Await their cancellation (or completion if they finished cleanly before cancel)
476
- if tasks: # Ensure tasks list is not empty
477
- await asyncio.gather(*tasks, return_exceptions=True) # Suppress errors from already handled/cancelled tasks
478
- logging.info("Gemini interaction tasks processing completed or handled.")
479
-
480
- except websockets.exceptions.ConnectionClosedError as e:
481
- logging.error(f"WebSocket connection closed with error code {e.code}: {e}")
482
- st.error(f"Connection to Gemini API failed: {e}. Please try again.")
483
- except asyncio.CancelledError:
484
- logging.info("GeminiInteractionLoop.run_main_loop() was cancelled.")
485
- except Exception as e: # General catch-all
486
- logging.error(
487
- f"Exception in GeminiInteractionLoop run_main_loop: {type(e).__name__}: {e}", exc_info=True)
488
- finally:
489
- logging.info("GeminiInteractionLoop.run_main_loop() finishing...")
490
  self.is_running = False
491
- self.signal_stop() # Ensure sentinels are sent
492
-
493
- self.gemini_session = None
494
- video_frames_to_gemini_q = None
495
- audio_chunks_to_gemini_q = None
496
- audio_from_gemini_playback_q = None
497
- logging.info(
498
- "GeminiInteractionLoop finished and global queues set to None.")
499
-
500
-
501
- # --- WebRTC Media Processors ---
502
- class VideoProcessor(VideoProcessorBase):
503
- def __init__(self):
504
- self.frame_counter = 0
505
- self.last_gemini_send_time = time.monotonic()
506
-
507
- async def _process_and_queue_frame_async(self, frame_ndarray):
508
- if video_frames_to_gemini_q is None:
509
  return
510
- self.frame_counter += 1
511
- current_time = time.monotonic()
512
- if (current_time - self.last_gemini_send_time) < (1.0 / VIDEO_FPS_TO_GEMINI):
 
 
 
 
 
 
 
 
 
 
 
513
  return
514
- self.last_gemini_send_time = current_time
515
  try:
516
- img_rgb = cv2.cvtColor(frame_ndarray, cv2.COLOR_BGR2RGB)
517
- pil_img = PIL.Image.fromarray(img_rgb)
518
- pil_img.thumbnail(VIDEO_API_RESIZE) # Smaller resolution
519
- image_io = io.BytesIO()
520
- pil_img.save(image_io, format="jpeg", quality=85) # Lower quality
521
- image_bytes = image_io.getvalue()
 
 
522
 
523
- # Check if image size is too large before encoding to base64
524
- if len(image_bytes) > MAX_PAYLOAD_SIZE_BYTES:
525
- logging.warning(f"Image too large ({len(image_bytes)} bytes), reducing quality further")
526
- image_io = io.BytesIO()
527
- pil_img.save(image_io, format="jpeg", quality=60) # Even lower quality
528
- image_bytes = image_io.getvalue()
529
 
530
- api_data = {"mime_type": "image/jpeg",
531
- "data": base64.b64encode(image_bytes).decode()}
532
-
533
- if video_frames_to_gemini_q.full():
534
- try:
535
- await asyncio.wait_for(video_frames_to_gemini_q.get(), timeout=0.01)
536
- except asyncio.TimeoutError:
537
- logging.warning("Video queue full, frame dropped.")
538
- return
539
- video_frames_to_gemini_q.put_nowait(api_data)
540
  except Exception as e:
541
- logging.error(
542
- f"Error processing/queueing video frame: {e}", exc_info=True)
543
-
544
- async def recv(self, frame):
545
- img_bgr = frame.to_ndarray(format="bgr24")
546
  try:
547
- loop = asyncio.get_running_loop()
548
- loop.create_task(self._process_and_queue_frame_async(img_bgr))
549
- except RuntimeError:
550
- logging.error(
551
- "VideoProcessor.recv: No running asyncio loop in current thread for create_task.")
552
- return frame
553
-
554
-
555
- class AudioProcessor(AudioProcessorBase):
556
- async def _process_and_queue_audio_async(self, audio_frames):
557
- if audio_chunks_to_gemini_q is None:
558
- return
559
- for frame in audio_frames:
 
 
 
 
 
 
560
  try:
561
- # Extract audio data from frame
562
- audio_data = frame.planes[0].to_bytes()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
563
 
564
- # Skip empty audio frames
565
- if not audio_data or len(audio_data) == 0:
566
- continue
567
 
568
- # Ensure we're using the correct format for Gemini API
569
- # WebSocket error 1007 occurs with invalid frame payload data
570
- # Using standard audio/L16 with 16kHz sample rate (matches SEND_SAMPLE_RATE)
571
- mime_type = "audio/L16;rate=16000;channels=1"
572
 
573
- # Validate audio data before queueing
574
- if isinstance(audio_data, bytes) and len(audio_data) > 0:
575
- # Check if data size is reasonable (avoid oversized payloads)
576
- if len(audio_data) > MAX_PAYLOAD_SIZE_BYTES:
577
- logging.warning(f"Audio chunk too large ({len(audio_data)} bytes), skipping")
578
- continue
579
-
580
- # Create properly formatted API data
581
- api_data = {
582
- "data": audio_data, # Keep as bytes, don't base64 encode
583
- "mime_type": mime_type
584
- }
585
-
586
- # Handle queue overflow
587
- if audio_chunks_to_gemini_q.full():
588
- try:
589
- # Remove oldest item if queue is full
590
- await asyncio.wait_for(audio_chunks_to_gemini_q.get(), timeout=0.01)
591
- audio_chunks_to_gemini_q.task_done()
592
- except asyncio.TimeoutError:
593
- logging.warning("Audio queue full, chunk dropped.")
594
- continue
595
-
596
- # Queue the validated audio data
597
- audio_chunks_to_gemini_q.put_nowait(api_data)
598
- else:
599
- logging.warning(f"Invalid audio data format: {type(audio_data)}, skipping")
600
  except Exception as e:
601
- logging.error(f"Error processing audio chunk: {e}", exc_info=True)
602
-
603
- async def recv(self, frames):
604
- try:
605
- loop = asyncio.get_running_loop()
606
- loop.create_task(self._process_and_queue_audio_async(frames))
607
- except RuntimeError:
608
- logging.error(
609
- "AudioProcessor.recv: No running asyncio loop in current thread for create_task.")
610
- return frames
611
-
612
-
613
- # --- Streamlit UI and Application Logic ---
614
- def initialize_app_session_state():
615
- defaults = {
616
- 'gemini_session_active': False,
617
- 'gemini_loop_instance': None,
618
- 'webrtc_component_key': f"webrtc_streamer_key_{int(time.time())}",
619
- }
620
- for key, value in defaults.items():
621
- if key not in st.session_state:
622
- st.session_state[key] = value
623
-
624
-
625
- def run_streamlit_app():
626
- st.set_page_config(page_title="Voice AI Medical Assistant", layout="wide")
627
- initialize_app_session_state()
628
-
629
- st.title("Voice AI Medical Assistant")
630
 
631
- # Display prominent error if client is not initialized
632
- if client is None:
633
- st.error("⚠️ Gemini API key not found or invalid. Please set a valid GEMINI_API_KEY in your .env file.")
634
- st.info("You can create a .env file in the project directory with content: GEMINI_API_KEY=your_api_key_here")
635
-
636
- st.warning("IMPORTANT: This is a VOICE-ONLY interface. Speak to the assistant through your microphone.")
637
- st.info("Remember: This AI cannot provide medical diagnoses. Always consult a healthcare professional for medical advice.")
638
-
639
- with st.sidebar:
640
- st.header("Session Control")
641
- if not st.session_state.gemini_session_active:
642
- # Fixed emojis
643
- if st.button("🚀 Start Voice Assistant", type="primary", use_container_width=True, key="start_session_btn"):
644
- st.session_state.gemini_session_active = True
645
-
646
- gemini_loop = GeminiInteractionLoop()
647
- st.session_state.gemini_loop_instance = gemini_loop
648
- threading.Thread(target=lambda: asyncio.run(gemini_loop.run_main_loop()), name="GeminiLoopThread", daemon=True).start()
649
- st.success("Voice Assistant starting... Please allow camera/microphone access in your browser if prompted.")
650
- st.session_state.webrtc_component_key = f"webrtc_streamer_key_{int(time.time())}"
651
- st.rerun()
652
- else:
653
- # Fixed emojis
654
- if st.button("🛑 Stop Session", type="secondary", use_container_width=True, key="stop_session_btn"):
655
- if st.session_state.gemini_loop_instance:
656
- st.session_state.gemini_loop_instance.signal_stop()
657
- st.session_state.gemini_loop_instance = None
658
- st.session_state.gemini_session_active = False
659
- st.warning("Session stopping...")
660
- time.sleep(0.5)
661
- st.rerun()
662
-
663
- if st.session_state.gemini_session_active:
664
- st.subheader("Your Live Feed (from your browser)")
665
-
666
- MEDIA_STREAM_CONSTRAINTS = {
667
- "video": True,
668
- "audio": {
669
- "sampleRate": {"ideal": WEBRTC_REQUESTED_SEND_SAMPLE_RATE},
670
- "channelCount": {"exact": WEBRTC_REQUESTED_AUDIO_CHANNELS},
671
- "echoCancellation": True,
672
- "noiseSuppression": True
673
- }
674
- }
675
-
676
- webrtc_ctx = webrtc_streamer(
677
- key=st.session_state.webrtc_component_key,
678
- mode=WebRtcMode.SENDONLY,
679
- rtc_configuration={
680
- "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
681
- },
682
- media_stream_constraints=MEDIA_STREAM_CONSTRAINTS,
683
- video_processor_factory=VideoProcessor,
684
- audio_processor_factory=AudioProcessor,
685
- async_processing=True,
686
  )
687
-
688
- if webrtc_ctx.state.playing:
689
- st.success("🎤 Voice Assistant is now ACTIVE. Speak to interact!")
690
- st.caption("The assistant is listening through your microphone and watching through your camera.")
691
- elif st.session_state.gemini_session_active:
692
- st.caption("Connecting... Ensure camera/microphone permissions are granted in your browser.")
693
- if hasattr(webrtc_ctx.state, 'error') and webrtc_ctx.state.error:
694
- st.error(f"WebRTC Connection Error: {webrtc_ctx.state.error}")
695
- else:
696
- st.info("Click 'Start Voice Assistant' in the sidebar to begin.")
697
 
698
- # Visual indicator for voice activity
699
- if st.session_state.gemini_session_active and webrtc_ctx.state.playing:
700
- with st.container():
701
- st.markdown("### How to use the Voice Assistant")
702
- st.markdown("""
703
- 1. **Speak naturally** - The assistant is listening through your microphone
704
- 2. **Show things to the camera** - The assistant can see what you're showing
705
- 3. **Listen for responses** - The assistant will speak back to you
706
-
707
- You do not need to type anything. This is a completely voice-controlled interface.
708
- """)
709
-
710
 
711
  if __name__ == "__main__":
712
- if client is None:
713
- logging.critical("Gemini client could not be initialized. Application cannot start.")
714
- else:
715
- run_streamlit_app()
 
 
 
1
  import os
2
+ import sys
3
  import asyncio
 
 
 
 
 
 
4
  import logging
5
+ import datetime
6
+ import argparse
7
+ import numpy as np
8
  import cv2
9
+ from queue import Queue
10
+ import time
11
+ import google as genai
 
 
 
 
 
12
  from google.genai.types import Content, Part
13
+ from azure.cognitiveservices.speech import SpeechConfig, SpeechSynthesizer, AudioConfig, ResultReason, CancellationReason
14
+ import sounddevice as sd
15
+ import soundfile as sf
16
+ import uuid
17
 
18
+ # Configure logging
19
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(name)s:%(message)s')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
+ # Define system prompt for the medical assistant
22
+ MEDICAL_ASSISTANT_SYSTEM_PROMPT = '''You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
 
 
 
 
23
  Your responsibilities are:
24
  1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe.
25
  2. **General Information (Non-Diagnostic):** Provide general information related to what is visually presented, if applicable. You are not a diagnostic tool.
 
31
  4. **Tone:** Maintain a helpful, empathetic, and calm tone.
32
  5. **Interaction:** After this initial instruction, you can make a brief acknowledgment of your role (e.g., "I'm ready to assist by looking at what you show me. Please remember to consult a doctor for medical advice."). Then, focus on responding to the user's visual input and questions.
33
  Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
34
+ '''
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
+ # Class to handle Gemini-Azure interaction
37
  class GeminiInteractionLoop:
38
+ def __init__(self, gemini_api_key, azure_speech_key, azure_speech_region, use_camera=True, use_speech=True):
39
+ self.gemini_api_key = gemini_api_key
40
+ self.azure_speech_key = azure_speech_key
41
+ self.azure_speech_region = azure_speech_region
42
+ self.use_camera = use_camera
43
+ self.use_speech = use_speech
44
+
45
+ # Initialize Gemini API
46
+ genai.configure(api_key=self.gemini_api_key)
47
+ self.model = genai.GenerativeModel('gemini-pro-vision')
48
  self.gemini_session = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
+ # Initialize camera
51
+ self.camera = None
52
+ if self.use_camera:
53
  try:
54
+ self.camera = cv2.VideoCapture(0)
55
+ if not self.camera.isOpened():
56
+ logging.error("Failed to open camera device")
57
+ self.use_camera = False
 
 
 
 
 
 
 
 
 
 
 
 
58
  except Exception as e:
59
+ logging.error(f"Error initializing camera: {e}")
60
+ self.use_camera = False
61
+
62
+ # Initialize Azure Speech Service
63
+ if self.use_speech:
64
  try:
65
+ self.speech_config = SpeechConfig(subscription=self.azure_speech_key, region=self.azure_speech_region)
66
+ self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural"
67
+ self.output_path = os.path.join(os.getcwd(), "temp_audio")
68
+ os.makedirs(self.output_path, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
69
  except Exception as e:
70
+ logging.error(f"Error initializing Azure Speech Service: {e}")
71
+ self.use_speech = False
72
+
73
+ # Async queues for communication
74
+ self.text_to_speech_queue = Queue()
75
+ self.is_running = True
76
+
77
+ # Capture image from camera
78
+ def capture_image(self):
79
+ if not self.use_camera or self.camera is None:
80
+ return None
81
+
82
+ ret, frame = self.camera.read()
83
+ if not ret:
84
+ logging.error("Failed to capture image from camera")
85
+ return None
86
+
87
+ return frame
88
+
89
+ # Stream media to Gemini
90
  async def stream_media_to_gemini(self):
91
+ logging.info("Starting media stream to Gemini...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  try:
93
+ interval = 5 # seconds between frames
94
+ last_capture_time = 0
95
+
96
  while self.is_running:
97
+ current_time = time.time()
98
+ if current_time - last_capture_time >= interval:
99
+ frame = self.capture_image()
100
+ if frame is not None:
101
+ _, encoded_image = cv2.imencode(".jpg", frame)
102
+ image_bytes = encoded_image.tobytes()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
+ try:
105
+ # Convert to format expected by Gemini
106
+ image_part = Part.from_data(mime_type="image/jpeg", data=image_bytes)
107
+ content = Content(role="user", parts=[image_part])
108
+
109
+ # Send to Gemini
110
+ self.gemini_session.content = content
111
+ await self.gemini_session.send_client_content()
112
+
113
+ logging.info("Sent image to Gemini")
114
+ except Exception as e:
115
+ logging.error(f"Error sending image to Gemini: {e}")
116
+
117
+ last_capture_time = current_time
118
+
119
+ await asyncio.sleep(1)
120
+ except Exception as e:
121
+ logging.error(f"Exception in stream_media_to_gemini: {e}")
122
+
123
+ # Send text input to Gemini
124
+ async def send_text_input_to_gemini(self, text):
125
+ if not text or not self.gemini_session:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
  return
127
+
128
+ try:
129
+ # Create content with text
130
+ text_part = Part.from_text(text)
131
+ content = Content(role="user", parts=[text_part])
132
+
133
+ # Send to Gemini
134
+ self.gemini_session.content = content
135
+ await self.gemini_session.send_client_content()
136
+
137
+ logging.info(f"Sent text to Gemini: {text}")
138
+ except Exception as e:
139
+ logging.error(f"Error sending text to Gemini: {e}")
140
+
141
+ # Process user text input
142
+ async def process_text_input(self):
143
+ logging.info("Starting text input processing...")
144
  try:
 
 
 
 
 
 
 
 
 
 
145
  while self.is_running:
146
+ user_input = input("Enter text (or 'exit' to quit): ")
147
+ if user_input.lower() == 'exit':
148
+ self.is_running = False
149
+ break
150
+
151
+ await self.send_text_input_to_gemini(user_input)
 
 
 
 
 
 
 
152
  except Exception as e:
153
+ logging.error(f"Exception in process_text_input: {e}")
154
+ self.is_running = False
155
+
156
+ # Process responses from Gemini
157
+ async def process_gemini_responses(self):
158
+ logging.info("Starting Gemini response processing...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  try:
160
+ async for response in self.gemini_session:
161
+ if not self.is_running:
162
+ break
163
+
164
  try:
165
+ # Process content
166
+ if hasattr(response, 'text'):
167
+ text = response.text
168
+ if text:
169
+ logging.info(f"Gemini response: {text}")
170
+ if self.use_speech:
171
+ self.text_to_speech_queue.put(text)
172
  except Exception as e:
173
+ logging.error(f"Error processing Gemini response: {e}")
174
+ except Exception as e:
175
+ logging.error(f"Exception in process_gemini_responses: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
176
  self.is_running = False
177
+
178
+ # Text-to-speech processor
179
+ async def text_to_speech_processor(self):
180
+ logging.info("Starting text-to-speech processor...")
181
+ if not self.use_speech:
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  return
183
+
184
+ try:
185
+ while self.is_running or not self.text_to_speech_queue.empty():
186
+ if not self.text_to_speech_queue.empty():
187
+ text = self.text_to_speech_queue.get()
188
+ await self._synthesize_speech(text)
189
+ else:
190
+ await asyncio.sleep(0.5)
191
+ except Exception as e:
192
+ logging.error(f"Exception in text_to_speech_processor: {e}")
193
+
194
+ # Synthesize speech
195
+ async def _synthesize_speech(self, text):
196
+ if not self.use_speech:
197
  return
198
+
199
  try:
200
+ # Generate unique filename
201
+ file_path = os.path.join(self.output_path, f"speech_{uuid.uuid4()}.wav")
202
+
203
+ # Configure output
204
+ audio_config = AudioConfig(filename=file_path)
205
+
206
+ # Create synthesizer
207
+ synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=audio_config)
208
 
209
+ # Synthesize speech
210
+ result = synthesizer.speak_text_async(text).get()
211
+
212
+ # Check result
213
+ if result.reason == ResultReason.SynthesizingAudioCompleted:
214
+ logging.info(f"Speech synthesized and saved to {file_path}")
215
 
216
+ # Play audio
217
+ await self._play_audio(file_path)
218
+ elif result.reason == ResultReason.Canceled:
219
+ cancellation = result.cancellation_details
220
+ logging.error(f"Speech synthesis canceled: {cancellation.reason}")
221
+ if cancellation.reason == CancellationReason.Error:
222
+ logging.error(f"Error details: {cancellation.error_details}")
 
 
 
223
  except Exception as e:
224
+ logging.error(f"Error in speech synthesis: {e}")
225
+
226
+ # Play audio
227
+ async def _play_audio(self, file_path):
 
228
  try:
229
+ data, fs = sf.read(file_path)
230
+ sd.play(data, fs)
231
+ sd.wait() # Wait until playback is done
232
+
233
+ # Clean up file
234
+ try:
235
+ os.remove(file_path)
236
+ except Exception as e:
237
+ logging.warning(f"Failed to remove temp audio file {file_path}: {e}")
238
+ except Exception as e:
239
+ logging.error(f"Error playing audio: {e}")
240
+
241
+ # Main loop
242
+ async def run_main_loop(self):
243
+ try:
244
+ logging.info("Initializing Gemini session...")
245
+ self.gemini_session = await self.model.start_session_async()
246
+
247
+ # Send system prompt
248
  try:
249
+ logging.info("Sending system prompt to Gemini...")
250
+ # Create Content object correctly
251
+ system_content = Content(
252
+ role="user",
253
+ parts=[Part(text=MEDICAL_ASSISTANT_SYSTEM_PROMPT)]
254
+ )
255
+ # Set the content property before calling send_client_content
256
+ self.gemini_session.content = system_content
257
+ # Call send_client_content without arguments
258
+ await self.gemini_session.send_client_content()
259
+ logging.info("System prompt sent successfully.")
260
+ except Exception as e:
261
+ logging.error(f"Failed to send system prompt: {e}", exc_info=True)
262
+ self.is_running = False
263
+ return
264
+
265
+ tasks = []
266
+ try:
267
+ logging.info("Creating async tasks for Gemini interaction...")
268
+ media_stream_task = asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
269
+ tasks.append(media_stream_task)
270
 
271
+ text_input_task = asyncio.create_task(self.process_text_input(), name="process_text_input")
272
+ tasks.append(text_input_task)
 
273
 
274
+ gemini_response_task = asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses")
275
+ tasks.append(gemini_response_task)
 
 
276
 
277
+ if self.use_speech:
278
+ tts_task = asyncio.create_task(self.text_to_speech_processor(), name="text_to_speech_processor")
279
+ tasks.append(tts_task)
280
+
281
+ await asyncio.gather(*tasks)
282
+ except asyncio.CancelledError:
283
+ logging.info("Main loop tasks cancelled")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
  except Exception as e:
285
+ logging.error(f"Exception in main loop tasks: {e}")
286
+ finally:
287
+ # Cancel tasks
288
+ for task in tasks:
289
+ if not task.done():
290
+ task.cancel()
291
+ try:
292
+ await task
293
+ except asyncio.CancelledError:
294
+ logging.info(f"Task {task.get_name()} cancelled")
295
+ except Exception as e:
296
+ logging.error(f"Exception in run_main_loop: {e}")
297
+ finally:
298
+ # Cleanup
299
+ logging.info("Cleaning up resources...")
300
+ if self.camera is not None and self.use_camera:
301
+ self.camera.release()
302
+
303
+ if self.gemini_session is not None:
304
+ await self.gemini_session.close()
 
 
 
 
 
 
 
 
 
305
 
306
+ # Clean up resources
307
+ def cleanup(self):
308
+ logging.info("Cleaning up resources...")
309
+ if self.camera is not None and self.use_camera:
310
+ self.camera.release()
311
+
312
+ # Main function
313
+ def main():
314
+ # Parse command line arguments
315
+ parser = argparse.ArgumentParser(description="Medical Assistant using Gemini and Azure Speech")
316
+ parser.add_argument("--gemini-api-key", help="Gemini API Key", default=os.environ.get("GEMINI_API_KEY"))
317
+ parser.add_argument("--azure-speech-key", help="Azure Speech API Key", default=os.environ.get("AZURE_SPEECH_KEY"))
318
+ parser.add_argument("--azure-speech-region", help="Azure Speech Region", default=os.environ.get("AZURE_SPEECH_REGION", "eastus"))
319
+ parser.add_argument("--no-camera", help="Disable camera usage", action="store_true")
320
+ parser.add_argument("--no-speech", help="Disable speech synthesis", action="store_true")
321
+ args = parser.parse_args()
322
+
323
+ # Check required parameters
324
+ if not args.gemini_api_key:
325
+ print("Error: Gemini API Key is required. Provide it via --gemini-api-key or GEMINI_API_KEY environment variable.")
326
+ return 1
327
+
328
+ if not args.azure_speech_key and not args.no_speech:
329
+ print("Error: Azure Speech Key is required for speech synthesis. Provide it via --azure-speech-key or AZURE_SPEECH_KEY environment variable, or use --no-speech to disable speech.")
330
+ return 1
331
+
332
+ try:
333
+ # Create interaction loop
334
+ interaction_loop = GeminiInteractionLoop(
335
+ gemini_api_key=args.gemini_api_key,
336
+ azure_speech_key=args.azure_speech_key,
337
+ azure_speech_region=args.azure_speech_region,
338
+ use_camera=not args.no_camera,
339
+ use_speech=not args.no_speech
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
340
  )
 
 
 
 
 
 
 
 
 
 
341
 
342
+ # Run main loop
343
+ asyncio.run(interaction_loop.run_main_loop())
344
+ except KeyboardInterrupt:
345
+ logging.info("Keyboard interrupt received. Shutting down...")
346
+ except Exception as e:
347
+ logging.error(f"Unhandled exception: {e}", exc_info=True)
348
+ return 1
349
+
350
+ return 0
 
 
 
351
 
352
  if __name__ == "__main__":
353
+ sys.exit(main())