import os import sys import asyncio import logging import datetime import argparse import numpy as np import cv2 from queue import Queue import time import google as genai from google.genai.types import Content, Part from azure.cognitiveservices.speech import SpeechConfig, SpeechSynthesizer, AudioConfig, ResultReason, CancellationReason import sounddevice as sd import soundfile as sf import uuid # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(name)s:%(message)s') # Define system prompt for the medical assistant MEDICAL_ASSISTANT_SYSTEM_PROMPT = '''You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice. Your responsibilities are: 1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe. 2. **General Information (Non-Diagnostic):** Provide general information related to what is visually presented, if applicable. You are not a diagnostic tool. 3. **Safety and Disclaimer (CRITICAL):** * You are an AI assistant, **NOT a medical doctor or a substitute for one.** * **DO NOT provide medical diagnoses, treatment advice, or interpret medical results (e.g., X-rays, scans, lab reports).** * When appropriate, and always if the user seems to be seeking diagnosis or treatment, explicitly state your limitations and **strongly advise the user to consult a qualified healthcare professional.** * If you see something that *appears* visually concerning (e.g., an unusual skin lesion, signs of injury), you may gently suggest it might be wise to have it looked at by a professional, without speculating on what it is. 4. **Tone:** Maintain a helpful, empathetic, and calm tone. 5. **Interaction:** After this initial instruction, you can make a brief acknowledgment of your role (e.g., "I'm ready to assist by looking at what you show me. Please remember to consult a doctor for medical advice."). Then, focus on responding to the user's visual input and questions. Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional." ''' # Class to handle Gemini-Azure interaction class GeminiInteractionLoop: def __init__(self, gemini_api_key, azure_speech_key, azure_speech_region, use_camera=True, use_speech=True): self.gemini_api_key = gemini_api_key self.azure_speech_key = azure_speech_key self.azure_speech_region = azure_speech_region self.use_camera = use_camera self.use_speech = use_speech # Initialize Gemini API genai.configure(api_key=self.gemini_api_key) self.model = genai.GenerativeModel('gemini-pro-vision') self.gemini_session = None # Initialize camera self.camera = None if self.use_camera: try: self.camera = cv2.VideoCapture(0) if not self.camera.isOpened(): logging.error("Failed to open camera device") self.use_camera = False except Exception as e: logging.error(f"Error initializing camera: {e}") self.use_camera = False # Initialize Azure Speech Service if self.use_speech: try: self.speech_config = SpeechConfig(subscription=self.azure_speech_key, region=self.azure_speech_region) self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural" self.output_path = os.path.join(os.getcwd(), "temp_audio") os.makedirs(self.output_path, exist_ok=True) except Exception as e: logging.error(f"Error initializing Azure Speech Service: {e}") self.use_speech = False # Async queues for communication self.text_to_speech_queue = Queue() self.is_running = True # Capture image from camera def capture_image(self): if not self.use_camera or self.camera is None: return None ret, frame = self.camera.read() if not ret: logging.error("Failed to capture image from camera") return None return frame # Stream media to Gemini async def stream_media_to_gemini(self): logging.info("Starting media stream to Gemini...") try: interval = 5 # seconds between frames last_capture_time = 0 while self.is_running: current_time = time.time() if current_time - last_capture_time >= interval: frame = self.capture_image() if frame is not None: _, encoded_image = cv2.imencode(".jpg", frame) image_bytes = encoded_image.tobytes() try: # Convert to format expected by Gemini image_part = Part.from_data(mime_type="image/jpeg", data=image_bytes) content = Content(role="user", parts=[image_part]) # Send to Gemini self.gemini_session.content = content await self.gemini_session.send_client_content() logging.info("Sent image to Gemini") except Exception as e: logging.error(f"Error sending image to Gemini: {e}") last_capture_time = current_time await asyncio.sleep(1) except Exception as e: logging.error(f"Exception in stream_media_to_gemini: {e}") # Send text input to Gemini async def send_text_input_to_gemini(self, text): if not text or not self.gemini_session: return try: # Create content with text text_part = Part.from_text(text) content = Content(role="user", parts=[text_part]) # Send to Gemini self.gemini_session.content = content await self.gemini_session.send_client_content() logging.info(f"Sent text to Gemini: {text}") except Exception as e: logging.error(f"Error sending text to Gemini: {e}") # Process user text input async def process_text_input(self): logging.info("Starting text input processing...") try: while self.is_running: user_input = input("Enter text (or 'exit' to quit): ") if user_input.lower() == 'exit': self.is_running = False break await self.send_text_input_to_gemini(user_input) except Exception as e: logging.error(f"Exception in process_text_input: {e}") self.is_running = False # Process responses from Gemini async def process_gemini_responses(self): logging.info("Starting Gemini response processing...") try: async for response in self.gemini_session: if not self.is_running: break try: # Process content if hasattr(response, 'text'): text = response.text if text: logging.info(f"Gemini response: {text}") if self.use_speech: self.text_to_speech_queue.put(text) except Exception as e: logging.error(f"Error processing Gemini response: {e}") except Exception as e: logging.error(f"Exception in process_gemini_responses: {e}") self.is_running = False # Text-to-speech processor async def text_to_speech_processor(self): logging.info("Starting text-to-speech processor...") if not self.use_speech: return try: while self.is_running or not self.text_to_speech_queue.empty(): if not self.text_to_speech_queue.empty(): text = self.text_to_speech_queue.get() await self._synthesize_speech(text) else: await asyncio.sleep(0.5) except Exception as e: logging.error(f"Exception in text_to_speech_processor: {e}") # Synthesize speech async def _synthesize_speech(self, text): if not self.use_speech: return try: # Generate unique filename file_path = os.path.join(self.output_path, f"speech_{uuid.uuid4()}.wav") # Configure output audio_config = AudioConfig(filename=file_path) # Create synthesizer synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=audio_config) # Synthesize speech result = synthesizer.speak_text_async(text).get() # Check result if result.reason == ResultReason.SynthesizingAudioCompleted: logging.info(f"Speech synthesized and saved to {file_path}") # Play audio await self._play_audio(file_path) elif result.reason == ResultReason.Canceled: cancellation = result.cancellation_details logging.error(f"Speech synthesis canceled: {cancellation.reason}") if cancellation.reason == CancellationReason.Error: logging.error(f"Error details: {cancellation.error_details}") except Exception as e: logging.error(f"Error in speech synthesis: {e}") # Play audio async def _play_audio(self, file_path): try: data, fs = sf.read(file_path) sd.play(data, fs) sd.wait() # Wait until playback is done # Clean up file try: os.remove(file_path) except Exception as e: logging.warning(f"Failed to remove temp audio file {file_path}: {e}") except Exception as e: logging.error(f"Error playing audio: {e}") # Main loop async def run_main_loop(self): try: logging.info("Initializing Gemini session...") self.gemini_session = await self.model.start_session_async() # Send system prompt try: logging.info("Sending system prompt to Gemini...") # Create Content object correctly system_content = Content( role="user", parts=[Part(text=MEDICAL_ASSISTANT_SYSTEM_PROMPT)] ) # Set the content property before calling send_client_content self.gemini_session.content = system_content # Call send_client_content without arguments await self.gemini_session.send_client_content() logging.info("System prompt sent successfully.") except Exception as e: logging.error(f"Failed to send system prompt: {e}", exc_info=True) self.is_running = False return tasks = [] try: logging.info("Creating async tasks for Gemini interaction...") media_stream_task = asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini") tasks.append(media_stream_task) text_input_task = asyncio.create_task(self.process_text_input(), name="process_text_input") tasks.append(text_input_task) gemini_response_task = asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses") tasks.append(gemini_response_task) if self.use_speech: tts_task = asyncio.create_task(self.text_to_speech_processor(), name="text_to_speech_processor") tasks.append(tts_task) await asyncio.gather(*tasks) except asyncio.CancelledError: logging.info("Main loop tasks cancelled") except Exception as e: logging.error(f"Exception in main loop tasks: {e}") finally: # Cancel tasks for task in tasks: if not task.done(): task.cancel() try: await task except asyncio.CancelledError: logging.info(f"Task {task.get_name()} cancelled") except Exception as e: logging.error(f"Exception in run_main_loop: {e}") finally: # Cleanup logging.info("Cleaning up resources...") if self.camera is not None and self.use_camera: self.camera.release() if self.gemini_session is not None: await self.gemini_session.close() # Clean up resources def cleanup(self): logging.info("Cleaning up resources...") if self.camera is not None and self.use_camera: self.camera.release() # Main function def main(): # Parse command line arguments parser = argparse.ArgumentParser(description="Medical Assistant using Gemini and Azure Speech") parser.add_argument("--gemini-api-key", help="Gemini API Key", default=os.environ.get("GEMINI_API_KEY")) parser.add_argument("--azure-speech-key", help="Azure Speech API Key", default=os.environ.get("AZURE_SPEECH_KEY")) parser.add_argument("--azure-speech-region", help="Azure Speech Region", default=os.environ.get("AZURE_SPEECH_REGION", "eastus")) parser.add_argument("--no-camera", help="Disable camera usage", action="store_true") parser.add_argument("--no-speech", help="Disable speech synthesis", action="store_true") args = parser.parse_args() # Check required parameters if not args.gemini_api_key: print("Error: Gemini API Key is required. Provide it via --gemini-api-key or GEMINI_API_KEY environment variable.") return 1 # REMOVED: Azure Speech Key dependency check # Now just use whatever is provided or default to disabled speech if key is missing if not args.azure_speech_key: args.no_speech = True logging.warning("No Azure Speech Key provided. Speech synthesis will be disabled.") try: # Create interaction loop interaction_loop = GeminiInteractionLoop( gemini_api_key=args.gemini_api_key, azure_speech_key=args.azure_speech_key, azure_speech_region=args.azure_speech_region, use_camera=not args.no_camera, use_speech=not args.no_speech ) # Run main loop asyncio.run(interaction_loop.run_main_loop()) except KeyboardInterrupt: logging.info("Keyboard interrupt received. Shutting down...") except Exception as e: logging.error(f"Unhandled exception: {e}", exc_info=True) return 1 return 0 if __name__ == "__main__": sys.exit(main())