Spaces:
Running
Running
File size: 15,802 Bytes
67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 922f944 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 90b4c24 7630a47 90b4c24 7630a47 90b4c24 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 90b4c24 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 67ea58d 7630a47 90b4c24 7630a47 90b4c24 7630a47 90b4c24 7630a47 67ea58d 7630a47 67ea58d 7630a47 b089c63 7630a47 67ea58d 7630a47 67ea58d 7630a47 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
import os
import sys
import asyncio
import logging
import datetime
import argparse
import numpy as np
import cv2
from queue import Queue
import time
import google as genai
from google.genai.types import Content, Part
from azure.cognitiveservices.speech import SpeechConfig, SpeechSynthesizer, AudioConfig, ResultReason, CancellationReason
import sounddevice as sd
import soundfile as sf
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(name)s:%(message)s')
# Define system prompt for the medical assistant
MEDICAL_ASSISTANT_SYSTEM_PROMPT = '''You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
Your responsibilities are:
1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe.
2. **General Information (Non-Diagnostic):** Provide general information related to what is visually presented, if applicable. You are not a diagnostic tool.
3. **Safety and Disclaimer (CRITICAL):**
* You are an AI assistant, **NOT a medical doctor or a substitute for one.**
* **DO NOT provide medical diagnoses, treatment advice, or interpret medical results (e.g., X-rays, scans, lab reports).**
* When appropriate, and always if the user seems to be seeking diagnosis or treatment, explicitly state your limitations and **strongly advise the user to consult a qualified healthcare professional.**
* If you see something that *appears* visually concerning (e.g., an unusual skin lesion, signs of injury), you may gently suggest it might be wise to have it looked at by a professional, without speculating on what it is.
4. **Tone:** Maintain a helpful, empathetic, and calm tone.
5. **Interaction:** After this initial instruction, you can make a brief acknowledgment of your role (e.g., "I'm ready to assist by looking at what you show me. Please remember to consult a doctor for medical advice."). Then, focus on responding to the user's visual input and questions.
Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
'''
# Class to handle Gemini-Azure interaction
class GeminiInteractionLoop:
def __init__(self, gemini_api_key, azure_speech_key, azure_speech_region, use_camera=True, use_speech=True):
self.gemini_api_key = gemini_api_key
self.azure_speech_key = azure_speech_key
self.azure_speech_region = azure_speech_region
self.use_camera = use_camera
self.use_speech = use_speech
# Initialize Gemini API
genai.configure(api_key=self.gemini_api_key)
self.model = genai.GenerativeModel('gemini-pro-vision')
self.gemini_session = None
# Initialize camera
self.camera = None
if self.use_camera:
try:
self.camera = cv2.VideoCapture(0)
if not self.camera.isOpened():
logging.error("Failed to open camera device")
self.use_camera = False
except Exception as e:
logging.error(f"Error initializing camera: {e}")
self.use_camera = False
# Initialize Azure Speech Service
if self.use_speech:
try:
self.speech_config = SpeechConfig(subscription=self.azure_speech_key, region=self.azure_speech_region)
self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural"
self.output_path = os.path.join(os.getcwd(), "temp_audio")
os.makedirs(self.output_path, exist_ok=True)
except Exception as e:
logging.error(f"Error initializing Azure Speech Service: {e}")
self.use_speech = False
# Async queues for communication
self.text_to_speech_queue = Queue()
self.is_running = True
# Capture image from camera
def capture_image(self):
if not self.use_camera or self.camera is None:
return None
ret, frame = self.camera.read()
if not ret:
logging.error("Failed to capture image from camera")
return None
return frame
# Stream media to Gemini
async def stream_media_to_gemini(self):
logging.info("Starting media stream to Gemini...")
try:
interval = 5 # seconds between frames
last_capture_time = 0
while self.is_running:
current_time = time.time()
if current_time - last_capture_time >= interval:
frame = self.capture_image()
if frame is not None:
_, encoded_image = cv2.imencode(".jpg", frame)
image_bytes = encoded_image.tobytes()
try:
# Convert to format expected by Gemini
image_part = Part.from_data(mime_type="image/jpeg", data=image_bytes)
content = Content(role="user", parts=[image_part])
# Send to Gemini
self.gemini_session.content = content
await self.gemini_session.send_client_content()
logging.info("Sent image to Gemini")
except Exception as e:
logging.error(f"Error sending image to Gemini: {e}")
last_capture_time = current_time
await asyncio.sleep(1)
except Exception as e:
logging.error(f"Exception in stream_media_to_gemini: {e}")
# Send text input to Gemini
async def send_text_input_to_gemini(self, text):
if not text or not self.gemini_session:
return
try:
# Create content with text
text_part = Part.from_text(text)
content = Content(role="user", parts=[text_part])
# Send to Gemini
self.gemini_session.content = content
await self.gemini_session.send_client_content()
logging.info(f"Sent text to Gemini: {text}")
except Exception as e:
logging.error(f"Error sending text to Gemini: {e}")
# Process user text input
async def process_text_input(self):
logging.info("Starting text input processing...")
try:
while self.is_running:
user_input = input("Enter text (or 'exit' to quit): ")
if user_input.lower() == 'exit':
self.is_running = False
break
await self.send_text_input_to_gemini(user_input)
except Exception as e:
logging.error(f"Exception in process_text_input: {e}")
self.is_running = False
# Process responses from Gemini
async def process_gemini_responses(self):
logging.info("Starting Gemini response processing...")
try:
async for response in self.gemini_session:
if not self.is_running:
break
try:
# Process content
if hasattr(response, 'text'):
text = response.text
if text:
logging.info(f"Gemini response: {text}")
if self.use_speech:
self.text_to_speech_queue.put(text)
except Exception as e:
logging.error(f"Error processing Gemini response: {e}")
except Exception as e:
logging.error(f"Exception in process_gemini_responses: {e}")
self.is_running = False
# Text-to-speech processor
async def text_to_speech_processor(self):
logging.info("Starting text-to-speech processor...")
if not self.use_speech:
return
try:
while self.is_running or not self.text_to_speech_queue.empty():
if not self.text_to_speech_queue.empty():
text = self.text_to_speech_queue.get()
await self._synthesize_speech(text)
else:
await asyncio.sleep(0.5)
except Exception as e:
logging.error(f"Exception in text_to_speech_processor: {e}")
# Synthesize speech
async def _synthesize_speech(self, text):
if not self.use_speech:
return
try:
# Generate unique filename
file_path = os.path.join(self.output_path, f"speech_{uuid.uuid4()}.wav")
# Configure output
audio_config = AudioConfig(filename=file_path)
# Create synthesizer
synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=audio_config)
# Synthesize speech
result = synthesizer.speak_text_async(text).get()
# Check result
if result.reason == ResultReason.SynthesizingAudioCompleted:
logging.info(f"Speech synthesized and saved to {file_path}")
# Play audio
await self._play_audio(file_path)
elif result.reason == ResultReason.Canceled:
cancellation = result.cancellation_details
logging.error(f"Speech synthesis canceled: {cancellation.reason}")
if cancellation.reason == CancellationReason.Error:
logging.error(f"Error details: {cancellation.error_details}")
except Exception as e:
logging.error(f"Error in speech synthesis: {e}")
# Play audio
async def _play_audio(self, file_path):
try:
data, fs = sf.read(file_path)
sd.play(data, fs)
sd.wait() # Wait until playback is done
# Clean up file
try:
os.remove(file_path)
except Exception as e:
logging.warning(f"Failed to remove temp audio file {file_path}: {e}")
except Exception as e:
logging.error(f"Error playing audio: {e}")
# Main loop
async def run_main_loop(self):
try:
logging.info("Initializing Gemini session...")
self.gemini_session = await self.model.start_session_async()
# Send system prompt
try:
logging.info("Sending system prompt to Gemini...")
# Create Content object correctly
system_content = Content(
role="user",
parts=[Part(text=MEDICAL_ASSISTANT_SYSTEM_PROMPT)]
)
# Set the content property before calling send_client_content
self.gemini_session.content = system_content
# Call send_client_content without arguments
await self.gemini_session.send_client_content()
logging.info("System prompt sent successfully.")
except Exception as e:
logging.error(f"Failed to send system prompt: {e}", exc_info=True)
self.is_running = False
return
tasks = []
try:
logging.info("Creating async tasks for Gemini interaction...")
media_stream_task = asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
tasks.append(media_stream_task)
text_input_task = asyncio.create_task(self.process_text_input(), name="process_text_input")
tasks.append(text_input_task)
gemini_response_task = asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses")
tasks.append(gemini_response_task)
if self.use_speech:
tts_task = asyncio.create_task(self.text_to_speech_processor(), name="text_to_speech_processor")
tasks.append(tts_task)
await asyncio.gather(*tasks)
except asyncio.CancelledError:
logging.info("Main loop tasks cancelled")
except Exception as e:
logging.error(f"Exception in main loop tasks: {e}")
finally:
# Cancel tasks
for task in tasks:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logging.info(f"Task {task.get_name()} cancelled")
except Exception as e:
logging.error(f"Exception in run_main_loop: {e}")
finally:
# Cleanup
logging.info("Cleaning up resources...")
if self.camera is not None and self.use_camera:
self.camera.release()
if self.gemini_session is not None:
await self.gemini_session.close()
# Clean up resources
def cleanup(self):
logging.info("Cleaning up resources...")
if self.camera is not None and self.use_camera:
self.camera.release()
# Main function
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Medical Assistant using Gemini and Azure Speech")
parser.add_argument("--gemini-api-key", help="Gemini API Key", default=os.environ.get("GEMINI_API_KEY"))
parser.add_argument("--azure-speech-key", help="Azure Speech API Key", default=os.environ.get("AZURE_SPEECH_KEY"))
parser.add_argument("--azure-speech-region", help="Azure Speech Region", default=os.environ.get("AZURE_SPEECH_REGION", "eastus"))
parser.add_argument("--no-camera", help="Disable camera usage", action="store_true")
parser.add_argument("--no-speech", help="Disable speech synthesis", action="store_true")
args = parser.parse_args()
# Check required parameters
if not args.gemini_api_key:
print("Error: Gemini API Key is required. Provide it via --gemini-api-key or GEMINI_API_KEY environment variable.")
return 1
# REMOVED: Azure Speech Key dependency check
# Now just use whatever is provided or default to disabled speech if key is missing
if not args.azure_speech_key:
args.no_speech = True
logging.warning("No Azure Speech Key provided. Speech synthesis will be disabled.")
try:
# Create interaction loop
interaction_loop = GeminiInteractionLoop(
gemini_api_key=args.gemini_api_key,
azure_speech_key=args.azure_speech_key,
azure_speech_region=args.azure_speech_region,
use_camera=not args.no_camera,
use_speech=not args.no_speech
)
# Run main loop
asyncio.run(interaction_loop.run_main_loop())
except KeyboardInterrupt:
logging.info("Keyboard interrupt received. Shutting down...")
except Exception as e:
logging.error(f"Unhandled exception: {e}", exc_info=True)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
|