Spaces:
Running
Running
import os | |
import sys | |
import asyncio | |
import logging | |
import datetime | |
import argparse | |
import numpy as np | |
import cv2 | |
from queue import Queue | |
import time | |
import google as genai | |
from google.genai.types import Content, Part | |
from azure.cognitiveservices.speech import SpeechConfig, SpeechSynthesizer, AudioConfig, ResultReason, CancellationReason | |
import sounddevice as sd | |
import soundfile as sf | |
import uuid | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(name)s:%(message)s') | |
# Define system prompt for the medical assistant | |
MEDICAL_ASSISTANT_SYSTEM_PROMPT = '''You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice. | |
Your responsibilities are: | |
1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe. | |
2. **General Information (Non-Diagnostic):** Provide general information related to what is visually presented, if applicable. You are not a diagnostic tool. | |
3. **Safety and Disclaimer (CRITICAL):** | |
* You are an AI assistant, **NOT a medical doctor or a substitute for one.** | |
* **DO NOT provide medical diagnoses, treatment advice, or interpret medical results (e.g., X-rays, scans, lab reports).** | |
* When appropriate, and always if the user seems to be seeking diagnosis or treatment, explicitly state your limitations and **strongly advise the user to consult a qualified healthcare professional.** | |
* If you see something that *appears* visually concerning (e.g., an unusual skin lesion, signs of injury), you may gently suggest it might be wise to have it looked at by a professional, without speculating on what it is. | |
4. **Tone:** Maintain a helpful, empathetic, and calm tone. | |
5. **Interaction:** After this initial instruction, you can make a brief acknowledgment of your role (e.g., "I'm ready to assist by looking at what you show me. Please remember to consult a doctor for medical advice."). Then, focus on responding to the user's visual input and questions. | |
Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional." | |
''' | |
# Class to handle Gemini-Azure interaction | |
class GeminiInteractionLoop: | |
def __init__(self, gemini_api_key, azure_speech_key, azure_speech_region, use_camera=True, use_speech=True): | |
self.gemini_api_key = gemini_api_key | |
self.azure_speech_key = azure_speech_key | |
self.azure_speech_region = azure_speech_region | |
self.use_camera = use_camera | |
self.use_speech = use_speech | |
# Initialize Gemini API | |
genai.configure(api_key=self.gemini_api_key) | |
self.model = genai.GenerativeModel('gemini-pro-vision') | |
self.gemini_session = None | |
# Initialize camera | |
self.camera = None | |
if self.use_camera: | |
try: | |
self.camera = cv2.VideoCapture(0) | |
if not self.camera.isOpened(): | |
logging.error("Failed to open camera device") | |
self.use_camera = False | |
except Exception as e: | |
logging.error(f"Error initializing camera: {e}") | |
self.use_camera = False | |
# Initialize Azure Speech Service | |
if self.use_speech: | |
try: | |
self.speech_config = SpeechConfig(subscription=self.azure_speech_key, region=self.azure_speech_region) | |
self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural" | |
self.output_path = os.path.join(os.getcwd(), "temp_audio") | |
os.makedirs(self.output_path, exist_ok=True) | |
except Exception as e: | |
logging.error(f"Error initializing Azure Speech Service: {e}") | |
self.use_speech = False | |
# Async queues for communication | |
self.text_to_speech_queue = Queue() | |
self.is_running = True | |
# Capture image from camera | |
def capture_image(self): | |
if not self.use_camera or self.camera is None: | |
return None | |
ret, frame = self.camera.read() | |
if not ret: | |
logging.error("Failed to capture image from camera") | |
return None | |
return frame | |
# Stream media to Gemini | |
async def stream_media_to_gemini(self): | |
logging.info("Starting media stream to Gemini...") | |
try: | |
interval = 5 # seconds between frames | |
last_capture_time = 0 | |
while self.is_running: | |
current_time = time.time() | |
if current_time - last_capture_time >= interval: | |
frame = self.capture_image() | |
if frame is not None: | |
_, encoded_image = cv2.imencode(".jpg", frame) | |
image_bytes = encoded_image.tobytes() | |
try: | |
# Convert to format expected by Gemini | |
image_part = Part.from_data(mime_type="image/jpeg", data=image_bytes) | |
content = Content(role="user", parts=[image_part]) | |
# Send to Gemini | |
self.gemini_session.content = content | |
await self.gemini_session.send_client_content() | |
logging.info("Sent image to Gemini") | |
except Exception as e: | |
logging.error(f"Error sending image to Gemini: {e}") | |
last_capture_time = current_time | |
await asyncio.sleep(1) | |
except Exception as e: | |
logging.error(f"Exception in stream_media_to_gemini: {e}") | |
# Send text input to Gemini | |
async def send_text_input_to_gemini(self, text): | |
if not text or not self.gemini_session: | |
return | |
try: | |
# Create content with text | |
text_part = Part.from_text(text) | |
content = Content(role="user", parts=[text_part]) | |
# Send to Gemini | |
self.gemini_session.content = content | |
await self.gemini_session.send_client_content() | |
logging.info(f"Sent text to Gemini: {text}") | |
except Exception as e: | |
logging.error(f"Error sending text to Gemini: {e}") | |
# Process user text input | |
async def process_text_input(self): | |
logging.info("Starting text input processing...") | |
try: | |
while self.is_running: | |
user_input = input("Enter text (or 'exit' to quit): ") | |
if user_input.lower() == 'exit': | |
self.is_running = False | |
break | |
await self.send_text_input_to_gemini(user_input) | |
except Exception as e: | |
logging.error(f"Exception in process_text_input: {e}") | |
self.is_running = False | |
# Process responses from Gemini | |
async def process_gemini_responses(self): | |
logging.info("Starting Gemini response processing...") | |
try: | |
async for response in self.gemini_session: | |
if not self.is_running: | |
break | |
try: | |
# Process content | |
if hasattr(response, 'text'): | |
text = response.text | |
if text: | |
logging.info(f"Gemini response: {text}") | |
if self.use_speech: | |
self.text_to_speech_queue.put(text) | |
except Exception as e: | |
logging.error(f"Error processing Gemini response: {e}") | |
except Exception as e: | |
logging.error(f"Exception in process_gemini_responses: {e}") | |
self.is_running = False | |
# Text-to-speech processor | |
async def text_to_speech_processor(self): | |
logging.info("Starting text-to-speech processor...") | |
if not self.use_speech: | |
return | |
try: | |
while self.is_running or not self.text_to_speech_queue.empty(): | |
if not self.text_to_speech_queue.empty(): | |
text = self.text_to_speech_queue.get() | |
await self._synthesize_speech(text) | |
else: | |
await asyncio.sleep(0.5) | |
except Exception as e: | |
logging.error(f"Exception in text_to_speech_processor: {e}") | |
# Synthesize speech | |
async def _synthesize_speech(self, text): | |
if not self.use_speech: | |
return | |
try: | |
# Generate unique filename | |
file_path = os.path.join(self.output_path, f"speech_{uuid.uuid4()}.wav") | |
# Configure output | |
audio_config = AudioConfig(filename=file_path) | |
# Create synthesizer | |
synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=audio_config) | |
# Synthesize speech | |
result = synthesizer.speak_text_async(text).get() | |
# Check result | |
if result.reason == ResultReason.SynthesizingAudioCompleted: | |
logging.info(f"Speech synthesized and saved to {file_path}") | |
# Play audio | |
await self._play_audio(file_path) | |
elif result.reason == ResultReason.Canceled: | |
cancellation = result.cancellation_details | |
logging.error(f"Speech synthesis canceled: {cancellation.reason}") | |
if cancellation.reason == CancellationReason.Error: | |
logging.error(f"Error details: {cancellation.error_details}") | |
except Exception as e: | |
logging.error(f"Error in speech synthesis: {e}") | |
# Play audio | |
async def _play_audio(self, file_path): | |
try: | |
data, fs = sf.read(file_path) | |
sd.play(data, fs) | |
sd.wait() # Wait until playback is done | |
# Clean up file | |
try: | |
os.remove(file_path) | |
except Exception as e: | |
logging.warning(f"Failed to remove temp audio file {file_path}: {e}") | |
except Exception as e: | |
logging.error(f"Error playing audio: {e}") | |
# Main loop | |
async def run_main_loop(self): | |
try: | |
logging.info("Initializing Gemini session...") | |
self.gemini_session = await self.model.start_session_async() | |
# Send system prompt | |
try: | |
logging.info("Sending system prompt to Gemini...") | |
# Create Content object correctly | |
system_content = Content( | |
role="user", | |
parts=[Part(text=MEDICAL_ASSISTANT_SYSTEM_PROMPT)] | |
) | |
# Set the content property before calling send_client_content | |
self.gemini_session.content = system_content | |
# Call send_client_content without arguments | |
await self.gemini_session.send_client_content() | |
logging.info("System prompt sent successfully.") | |
except Exception as e: | |
logging.error(f"Failed to send system prompt: {e}", exc_info=True) | |
self.is_running = False | |
return | |
tasks = [] | |
try: | |
logging.info("Creating async tasks for Gemini interaction...") | |
media_stream_task = asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini") | |
tasks.append(media_stream_task) | |
text_input_task = asyncio.create_task(self.process_text_input(), name="process_text_input") | |
tasks.append(text_input_task) | |
gemini_response_task = asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses") | |
tasks.append(gemini_response_task) | |
if self.use_speech: | |
tts_task = asyncio.create_task(self.text_to_speech_processor(), name="text_to_speech_processor") | |
tasks.append(tts_task) | |
await asyncio.gather(*tasks) | |
except asyncio.CancelledError: | |
logging.info("Main loop tasks cancelled") | |
except Exception as e: | |
logging.error(f"Exception in main loop tasks: {e}") | |
finally: | |
# Cancel tasks | |
for task in tasks: | |
if not task.done(): | |
task.cancel() | |
try: | |
await task | |
except asyncio.CancelledError: | |
logging.info(f"Task {task.get_name()} cancelled") | |
except Exception as e: | |
logging.error(f"Exception in run_main_loop: {e}") | |
finally: | |
# Cleanup | |
logging.info("Cleaning up resources...") | |
if self.camera is not None and self.use_camera: | |
self.camera.release() | |
if self.gemini_session is not None: | |
await self.gemini_session.close() | |
# Clean up resources | |
def cleanup(self): | |
logging.info("Cleaning up resources...") | |
if self.camera is not None and self.use_camera: | |
self.camera.release() | |
# Main function | |
def main(): | |
# Parse command line arguments | |
parser = argparse.ArgumentParser(description="Medical Assistant using Gemini and Azure Speech") | |
parser.add_argument("--gemini-api-key", help="Gemini API Key", default=os.environ.get("GEMINI_API_KEY")) | |
parser.add_argument("--azure-speech-key", help="Azure Speech API Key", default=os.environ.get("AZURE_SPEECH_KEY")) | |
parser.add_argument("--azure-speech-region", help="Azure Speech Region", default=os.environ.get("AZURE_SPEECH_REGION", "eastus")) | |
parser.add_argument("--no-camera", help="Disable camera usage", action="store_true") | |
parser.add_argument("--no-speech", help="Disable speech synthesis", action="store_true") | |
args = parser.parse_args() | |
# Check required parameters | |
if not args.gemini_api_key: | |
print("Error: Gemini API Key is required. Provide it via --gemini-api-key or GEMINI_API_KEY environment variable.") | |
return 1 | |
# REMOVED: Azure Speech Key dependency check | |
# Now just use whatever is provided or default to disabled speech if key is missing | |
if not args.azure_speech_key: | |
args.no_speech = True | |
logging.warning("No Azure Speech Key provided. Speech synthesis will be disabled.") | |
try: | |
# Create interaction loop | |
interaction_loop = GeminiInteractionLoop( | |
gemini_api_key=args.gemini_api_key, | |
azure_speech_key=args.azure_speech_key, | |
azure_speech_region=args.azure_speech_region, | |
use_camera=not args.no_camera, | |
use_speech=not args.no_speech | |
) | |
# Run main loop | |
asyncio.run(interaction_loop.run_main_loop()) | |
except KeyboardInterrupt: | |
logging.info("Keyboard interrupt received. Shutting down...") | |
except Exception as e: | |
logging.error(f"Unhandled exception: {e}", exc_info=True) | |
return 1 | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) | |