Spatial-awares / app.py
noumanjavaid's picture
Update app.py
922f944 verified
import os
import sys
import asyncio
import logging
import datetime
import argparse
import numpy as np
import cv2
from queue import Queue
import time
import google as genai
from google.genai.types import Content, Part
from azure.cognitiveservices.speech import SpeechConfig, SpeechSynthesizer, AudioConfig, ResultReason, CancellationReason
import sounddevice as sd
import soundfile as sf
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(name)s:%(message)s')
# Define system prompt for the medical assistant
MEDICAL_ASSISTANT_SYSTEM_PROMPT = '''You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice.
Your responsibilities are:
1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe.
2. **General Information (Non-Diagnostic):** Provide general information related to what is visually presented, if applicable. You are not a diagnostic tool.
3. **Safety and Disclaimer (CRITICAL):**
* You are an AI assistant, **NOT a medical doctor or a substitute for one.**
* **DO NOT provide medical diagnoses, treatment advice, or interpret medical results (e.g., X-rays, scans, lab reports).**
* When appropriate, and always if the user seems to be seeking diagnosis or treatment, explicitly state your limitations and **strongly advise the user to consult a qualified healthcare professional.**
* If you see something that *appears* visually concerning (e.g., an unusual skin lesion, signs of injury), you may gently suggest it might be wise to have it looked at by a professional, without speculating on what it is.
4. **Tone:** Maintain a helpful, empathetic, and calm tone.
5. **Interaction:** After this initial instruction, you can make a brief acknowledgment of your role (e.g., "I'm ready to assist by looking at what you show me. Please remember to consult a doctor for medical advice."). Then, focus on responding to the user's visual input and questions.
Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional."
'''
# Class to handle Gemini-Azure interaction
class GeminiInteractionLoop:
def __init__(self, gemini_api_key, azure_speech_key, azure_speech_region, use_camera=True, use_speech=True):
self.gemini_api_key = gemini_api_key
self.azure_speech_key = azure_speech_key
self.azure_speech_region = azure_speech_region
self.use_camera = use_camera
self.use_speech = use_speech
# Initialize Gemini API
genai.configure(api_key=self.gemini_api_key)
self.model = genai.GenerativeModel('gemini-pro-vision')
self.gemini_session = None
# Initialize camera
self.camera = None
if self.use_camera:
try:
self.camera = cv2.VideoCapture(0)
if not self.camera.isOpened():
logging.error("Failed to open camera device")
self.use_camera = False
except Exception as e:
logging.error(f"Error initializing camera: {e}")
self.use_camera = False
# Initialize Azure Speech Service
if self.use_speech:
try:
self.speech_config = SpeechConfig(subscription=self.azure_speech_key, region=self.azure_speech_region)
self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural"
self.output_path = os.path.join(os.getcwd(), "temp_audio")
os.makedirs(self.output_path, exist_ok=True)
except Exception as e:
logging.error(f"Error initializing Azure Speech Service: {e}")
self.use_speech = False
# Async queues for communication
self.text_to_speech_queue = Queue()
self.is_running = True
# Capture image from camera
def capture_image(self):
if not self.use_camera or self.camera is None:
return None
ret, frame = self.camera.read()
if not ret:
logging.error("Failed to capture image from camera")
return None
return frame
# Stream media to Gemini
async def stream_media_to_gemini(self):
logging.info("Starting media stream to Gemini...")
try:
interval = 5 # seconds between frames
last_capture_time = 0
while self.is_running:
current_time = time.time()
if current_time - last_capture_time >= interval:
frame = self.capture_image()
if frame is not None:
_, encoded_image = cv2.imencode(".jpg", frame)
image_bytes = encoded_image.tobytes()
try:
# Convert to format expected by Gemini
image_part = Part.from_data(mime_type="image/jpeg", data=image_bytes)
content = Content(role="user", parts=[image_part])
# Send to Gemini
self.gemini_session.content = content
await self.gemini_session.send_client_content()
logging.info("Sent image to Gemini")
except Exception as e:
logging.error(f"Error sending image to Gemini: {e}")
last_capture_time = current_time
await asyncio.sleep(1)
except Exception as e:
logging.error(f"Exception in stream_media_to_gemini: {e}")
# Send text input to Gemini
async def send_text_input_to_gemini(self, text):
if not text or not self.gemini_session:
return
try:
# Create content with text
text_part = Part.from_text(text)
content = Content(role="user", parts=[text_part])
# Send to Gemini
self.gemini_session.content = content
await self.gemini_session.send_client_content()
logging.info(f"Sent text to Gemini: {text}")
except Exception as e:
logging.error(f"Error sending text to Gemini: {e}")
# Process user text input
async def process_text_input(self):
logging.info("Starting text input processing...")
try:
while self.is_running:
user_input = input("Enter text (or 'exit' to quit): ")
if user_input.lower() == 'exit':
self.is_running = False
break
await self.send_text_input_to_gemini(user_input)
except Exception as e:
logging.error(f"Exception in process_text_input: {e}")
self.is_running = False
# Process responses from Gemini
async def process_gemini_responses(self):
logging.info("Starting Gemini response processing...")
try:
async for response in self.gemini_session:
if not self.is_running:
break
try:
# Process content
if hasattr(response, 'text'):
text = response.text
if text:
logging.info(f"Gemini response: {text}")
if self.use_speech:
self.text_to_speech_queue.put(text)
except Exception as e:
logging.error(f"Error processing Gemini response: {e}")
except Exception as e:
logging.error(f"Exception in process_gemini_responses: {e}")
self.is_running = False
# Text-to-speech processor
async def text_to_speech_processor(self):
logging.info("Starting text-to-speech processor...")
if not self.use_speech:
return
try:
while self.is_running or not self.text_to_speech_queue.empty():
if not self.text_to_speech_queue.empty():
text = self.text_to_speech_queue.get()
await self._synthesize_speech(text)
else:
await asyncio.sleep(0.5)
except Exception as e:
logging.error(f"Exception in text_to_speech_processor: {e}")
# Synthesize speech
async def _synthesize_speech(self, text):
if not self.use_speech:
return
try:
# Generate unique filename
file_path = os.path.join(self.output_path, f"speech_{uuid.uuid4()}.wav")
# Configure output
audio_config = AudioConfig(filename=file_path)
# Create synthesizer
synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=audio_config)
# Synthesize speech
result = synthesizer.speak_text_async(text).get()
# Check result
if result.reason == ResultReason.SynthesizingAudioCompleted:
logging.info(f"Speech synthesized and saved to {file_path}")
# Play audio
await self._play_audio(file_path)
elif result.reason == ResultReason.Canceled:
cancellation = result.cancellation_details
logging.error(f"Speech synthesis canceled: {cancellation.reason}")
if cancellation.reason == CancellationReason.Error:
logging.error(f"Error details: {cancellation.error_details}")
except Exception as e:
logging.error(f"Error in speech synthesis: {e}")
# Play audio
async def _play_audio(self, file_path):
try:
data, fs = sf.read(file_path)
sd.play(data, fs)
sd.wait() # Wait until playback is done
# Clean up file
try:
os.remove(file_path)
except Exception as e:
logging.warning(f"Failed to remove temp audio file {file_path}: {e}")
except Exception as e:
logging.error(f"Error playing audio: {e}")
# Main loop
async def run_main_loop(self):
try:
logging.info("Initializing Gemini session...")
self.gemini_session = await self.model.start_session_async()
# Send system prompt
try:
logging.info("Sending system prompt to Gemini...")
# Create Content object correctly
system_content = Content(
role="user",
parts=[Part(text=MEDICAL_ASSISTANT_SYSTEM_PROMPT)]
)
# Set the content property before calling send_client_content
self.gemini_session.content = system_content
# Call send_client_content without arguments
await self.gemini_session.send_client_content()
logging.info("System prompt sent successfully.")
except Exception as e:
logging.error(f"Failed to send system prompt: {e}", exc_info=True)
self.is_running = False
return
tasks = []
try:
logging.info("Creating async tasks for Gemini interaction...")
media_stream_task = asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini")
tasks.append(media_stream_task)
text_input_task = asyncio.create_task(self.process_text_input(), name="process_text_input")
tasks.append(text_input_task)
gemini_response_task = asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses")
tasks.append(gemini_response_task)
if self.use_speech:
tts_task = asyncio.create_task(self.text_to_speech_processor(), name="text_to_speech_processor")
tasks.append(tts_task)
await asyncio.gather(*tasks)
except asyncio.CancelledError:
logging.info("Main loop tasks cancelled")
except Exception as e:
logging.error(f"Exception in main loop tasks: {e}")
finally:
# Cancel tasks
for task in tasks:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logging.info(f"Task {task.get_name()} cancelled")
except Exception as e:
logging.error(f"Exception in run_main_loop: {e}")
finally:
# Cleanup
logging.info("Cleaning up resources...")
if self.camera is not None and self.use_camera:
self.camera.release()
if self.gemini_session is not None:
await self.gemini_session.close()
# Clean up resources
def cleanup(self):
logging.info("Cleaning up resources...")
if self.camera is not None and self.use_camera:
self.camera.release()
# Main function
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Medical Assistant using Gemini and Azure Speech")
parser.add_argument("--gemini-api-key", help="Gemini API Key", default=os.environ.get("GEMINI_API_KEY"))
parser.add_argument("--azure-speech-key", help="Azure Speech API Key", default=os.environ.get("AZURE_SPEECH_KEY"))
parser.add_argument("--azure-speech-region", help="Azure Speech Region", default=os.environ.get("AZURE_SPEECH_REGION", "eastus"))
parser.add_argument("--no-camera", help="Disable camera usage", action="store_true")
parser.add_argument("--no-speech", help="Disable speech synthesis", action="store_true")
args = parser.parse_args()
# Check required parameters
if not args.gemini_api_key:
print("Error: Gemini API Key is required. Provide it via --gemini-api-key or GEMINI_API_KEY environment variable.")
return 1
# REMOVED: Azure Speech Key dependency check
# Now just use whatever is provided or default to disabled speech if key is missing
if not args.azure_speech_key:
args.no_speech = True
logging.warning("No Azure Speech Key provided. Speech synthesis will be disabled.")
try:
# Create interaction loop
interaction_loop = GeminiInteractionLoop(
gemini_api_key=args.gemini_api_key,
azure_speech_key=args.azure_speech_key,
azure_speech_region=args.azure_speech_region,
use_camera=not args.no_camera,
use_speech=not args.no_speech
)
# Run main loop
asyncio.run(interaction_loop.run_main_loop())
except KeyboardInterrupt:
logging.info("Keyboard interrupt received. Shutting down...")
except Exception as e:
logging.error(f"Unhandled exception: {e}", exc_info=True)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())