Whshhs / app.py
Athspi's picture
Update app.py
bdfd7a5 verified
raw
history blame
44 kB
# File: app.py
import gradio as gr
import asyncio
import base64
import io
import traceback
import cv2
import pyaudio
import PIL.Image
import mss
import google.generativeai as genai
from google.generativeai import types
import google.api_core.exceptions
import wave
import numpy as np
import threading
import queue
import os
import time
import tempfile
import atexit # For cleanup
# --- Constants ---
FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000 # Gemini outputs at 24kHz
CHUNK_SIZE = 1024
MODEL = "models/gemini-2.0-flash-exp" # Use the requested experimental model
DEFAULT_VIDEO_MODE = "none"
AVAILABLE_VOICES = ["Puck", "Charon", "Kore", "Fenrir", "Aoede"]
DEFAULT_VOICE = "Puck"
SYSTEM_INSTRUCTION_TEXT = "Answer user ask replay same thing user say no other word explain "
# --- Global State ---
audio_loop_instance = None
background_tasks = set()
background_loop = None # Event loop for the background thread
pya = None # Initialize PyAudio globally later
background_thread = None # Keep track of the thread
stop_background_loop = False # Flag to signal loop termination
# --- Original AudioLoop Class Methods (Included for potential future use) ---
# Note: We inherit from the original structure for clarity but override key methods
class OriginalAudioLoop:
"""Base class structure placeholder - includes relevant methods from original script"""
def __init__(self, video_mode=DEFAULT_VIDEO_MODE):
self.video_mode = video_mode
self.out_queue = None # Queue for data *to* Gemini (mic audio, images)
self.session = None
self.audio_stream = None # Mic input stream
def _get_frame(self, cap):
ret, frame = cap.read()
if not ret: return None
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = PIL.Image.fromarray(frame_rgb)
img.thumbnail([1024, 1024])
image_io = io.BytesIO()
img.save(image_io, format="jpeg")
image_io.seek(0)
mime_type = "image/jpeg"
image_bytes = image_io.read()
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
async def get_frames(self):
cap = None
try:
print("Attempting to open camera...")
cap = await asyncio.to_thread(cv2.VideoCapture, 0)
if not cap.isOpened():
print("Error: Could not open camera.")
# Signal error back to Gradio? For now, just log and exit task.
await run_coro_in_background_loop(update_status("Error: Could not open camera."))
return
print("Camera opened successfully.")
while True:
if not self.session: # Stop if disconnected
print("get_frames: Session closed, stopping camera task.")
break
# print("Reading frame from camera...")
frame = await asyncio.to_thread(self._get_frame, cap)
if frame is None:
# print("Warning: Failed to get frame from camera.")
await asyncio.sleep(0.1) # Avoid busy loop
continue # Skip putting None in queue
if self.out_queue:
# print("Putting camera frame in queue.")
await self.out_queue.put(frame)
await asyncio.sleep(1.0) # Send frame every second
except asyncio.CancelledError:
print("get_frames task cancelled.")
except Exception as e:
print(f"Error in get_frames: {e}")
await run_coro_in_background_loop(update_status(f"Camera Error: {e}"))
finally:
if cap and cap.isOpened():
print("Releasing camera.")
await asyncio.to_thread(cap.release)
print("Camera task finished.")
def _get_screen(self):
try:
with mss.mss() as sct:
# Attempt to grab the primary monitor (often index 1 in mss.monitors)
monitor = sct.monitors[1]
sct_img = sct.grab(monitor)
img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") # Handle BGRA
image_io = io.BytesIO()
img.thumbnail([1024, 1024]) # Resize before saving
img.save(image_io, format="jpeg")
image_io.seek(0)
mime_type = "image/jpeg"
image_bytes = image_io.read()
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
except IndexError:
print("Error capturing screen: Could not find monitor at index 1. Trying index 0.")
try: # Fallback to monitor 0 (usually includes all screens)
with mss.mss() as sct:
monitor = sct.monitors[0]
sct_img = sct.grab(monitor)
img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
image_io = io.BytesIO()
img.thumbnail([1024, 1024])
img.save(image_io, format="jpeg")
image_io.seek(0)
mime_type = "image/jpeg"
image_bytes = image_io.read()
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
except Exception as e_fallback:
print(f"Error capturing screen (fallback monitor 0): {e_fallback}")
return None
except Exception as e:
print(f"Error capturing screen: {e}")
return None
async def get_screen(self):
while True:
if not self.session: # Stop if disconnected
print("get_screen: Session closed, stopping screen task.")
break
# print("Capturing screen...")
frame = await asyncio.to_thread(self._get_screen)
if frame is None:
print("Warning: Failed to capture screen.")
await asyncio.sleep(1.0) # Wait before retrying if error occurred
continue # Skip putting None in queue
if self.out_queue:
# print("Putting screen frame in queue.")
await self.out_queue.put(frame)
await asyncio.sleep(1.0) # Send screen frame every second
async def send_realtime(self):
"""Sends microphone audio or video frames from the out_queue to Gemini."""
while True:
if not self.session or not self.out_queue:
# Wait if session/queue not ready or if disconnected
await asyncio.sleep(0.1)
if not self.session: # Check again after sleep if disconnected
print("send_realtime: Session closed, stopping task.")
break
continue
try:
msg = await asyncio.wait_for(self.out_queue.get(), timeout=1.0) # Wait with timeout
if self.session: # Check again in case session closed while waiting
# print(f"Sending {msg.get('mime_type', 'unknown type')} to Gemini...")
await self.session.send(input=msg)
self.out_queue.task_done()
except asyncio.TimeoutError:
# print("send_realtime: Queue empty, waiting...")
continue # No message in queue, loop again
except asyncio.CancelledError:
print("send_realtime task cancelled.")
break
except Exception as e:
print(f"Error in send_realtime: {e}")
await run_coro_in_background_loop(update_status(f"Send Error: {e}"))
# Avoid continuous errors if session is bad
if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)):
print("Connection error in send_realtime, pausing...")
await asyncio.sleep(5)
async def listen_audio(self):
"""Listens to microphone and puts audio chunks onto the out_queue."""
global pya
if not pya:
print("Error: PyAudio not initialized in listen_audio.")
await run_coro_in_background_loop(update_status("Error: Audio system not ready."))
return
mic_info = None
stream = None
try:
print("Attempting to open microphone...")
mic_info = await asyncio.to_thread(pya.get_default_input_device_info)
stream = await asyncio.to_thread(
pya.open,
format=FORMAT,
channels=CHANNELS,
rate=SEND_SAMPLE_RATE,
input=True,
input_device_index=mic_info["index"],
frames_per_buffer=CHUNK_SIZE,
)
self.audio_stream = stream # Store reference for cleanup
print("Microphone stream opened.")
if __debug__:
kwargs = {"exception_on_overflow": False}
else:
kwargs = {}
while True:
if not self.session: # Stop if disconnected
print("listen_audio: Session closed, stopping microphone task.")
break
try:
# print("Reading from microphone...")
data = await asyncio.to_thread(stream.read, CHUNK_SIZE, **kwargs)
if self.out_queue:
# print("Putting microphone data in queue.")
await self.out_queue.put({"data": data, "mime_type": "audio/pcm"})
except IOError as e:
# This often happens if the buffer overflows or the stream is closed abruptly
# print(f"PyAudio read error (possible overflow or stream closed): {e}")
await asyncio.sleep(0.05) # Short pause before trying again
except asyncio.CancelledError:
print("listen_audio task cancelled.")
break
except OSError as e:
print(f"Error opening microphone: {e}. Is a microphone connected and accessible?")
await run_coro_in_background_loop(update_status(f"Mic Error: {e}"))
except Exception as e:
print(f"Error in listen_audio: {e}")
traceback.print_exc()
await run_coro_in_background_loop(update_status(f"Mic Error: {e}"))
finally:
if stream:
print("Stopping and closing microphone stream.")
await asyncio.to_thread(stream.stop_stream)
await asyncio.to_thread(stream.close)
self.audio_stream = None # Clear reference
print("Microphone stream closed.")
# --- Gradio Specific Audio Loop ---
class GradioAudioLoop(OriginalAudioLoop): # Inherit and modify/add methods
def __init__(self, video_mode=DEFAULT_VIDEO_MODE, api_key=None, voice_name=DEFAULT_VOICE):
super().__init__(video_mode)
self.api_key = api_key
self.voice_name = voice_name
self.client = None
self.config = None
self.connection_status = "Disconnected" # Internal status
# Queues for communication between Gradio handler and background loop
self.text_input_queue = asyncio.Queue()
self.response_text_queue = asyncio.Queue()
self.response_audio_queue = asyncio.Queue()
self.response_event = asyncio.Event() # Signal when response is ready
# Buffers for accumulating response data within a turn
self.current_audio_buffer = io.BytesIO()
self.current_text_response = ""
def _initialize_client_and_config(self):
"""Initialize Gemini client and configuration."""
if not self.api_key:
raise ValueError("API key is not set.")
try:
# Use v1beta for experimental models if needed, adjust if stable
# http_options={"api_version": "v1beta"} # Try if v1alpha causes issues
# Check if GEMINI_API_KEY env var exists, otherwise use provided key
api_key_to_use = os.getenv("GEMINI_API_KEY", self.api_key)
if not api_key_to_use:
raise ValueError("No API key provided or found in GEMINI_API_KEY environment variable.")
# Use Client instead of genai.configure if passing key directly
print("Initializing Gemini Client...")
self.client = genai.Client(api_key=api_key_to_use)
print(f"Setting up LiveConnectConfig with voice: {self.voice_name}")
self.config = types.LiveConnectConfig(
response_modalities=["audio", "text"], # Get both audio and text
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=self.voice_name) # Use selected voice
)
),
system_instruction=types.Content(
parts=[types.Part.from_text(text=SYSTEM_INSTRUCTION_TEXT)],
role="user" # System instructions often role='user'
),
)
print("Gemini client and config initialized successfully.")
self.connection_status = "Initialized"
return True # Indicate success
except Exception as e:
print(f"Error initializing Gemini client: {e}")
self.client = None
self.config = None
self.connection_status = f"Initialization Error: {e}"
return False # Indicate failure
async def process_text_inputs(self):
""" Task to wait for text input from Gradio and send it to Gemini """
while True:
try:
# Wait indefinitely for an item from the queue
text_to_send = await self.text_input_queue.get()
if text_to_send is None: # Use None as a signal to stop
print("Stopping text input processing.")
break
if self.session and self.connection_status == "Connected":
print(f"Sending text to Gemini: {text_to_send[:50]}...")
# Reset response holders before sending new message
self.current_audio_buffer = io.BytesIO()
self.current_text_response = ""
self.response_event.clear()
# Send text and indicate end of turn
await self.session.send(input=text_to_send or ".", end_of_turn=True)
print("Text sent, waiting for response...")
else:
print(f"Warning: Cannot send text. Session not active or status is {self.connection_status}.")
# Signal back an error to the waiting Gradio handler
await self.response_text_queue.put(f"Error: Not connected or connection issue ({self.connection_status}). Cannot send message.")
await self.response_audio_queue.put(b"") # Empty audio
self.response_event.set() # Unblock the handler
self.text_input_queue.task_done() # Mark task as done
except asyncio.CancelledError:
print("process_text_inputs task cancelled.")
break
except Exception as e:
print(f"Error in process_text_inputs: {e}")
# Signal error back to the waiting Gradio handler
await self.response_text_queue.put(f"Error sending message: {e}")
await self.response_audio_queue.put(b"")
self.response_event.set()
# Avoid loop BSoD on continuous errors
await asyncio.sleep(1)
async def receive_responses(self):
""" Task to receive responses (audio/text) from Gemini """
while True:
if not self.session or self.connection_status != "Connected":
# print("receive_responses: Session not ready or not connected, waiting...")
await asyncio.sleep(0.2)
if not self.session: # Check if disconnected while waiting
print("receive_responses: Session closed, stopping task.")
break
continue
try:
# print("Waiting for Gemini turn...")
turn = self.session.receive() # This blocks until a turn starts
# print("Gemini turn started.")
async for response in turn:
if data := response.data:
# print(f"Received audio chunk: {len(data)} bytes")
self.current_audio_buffer.write(data)
if text := response.text:
# print(f"Received text chunk: {text}")
self.current_text_response += text
# Turn complete - put results onto response queues and signal Gradio handler
# print("Gemini turn complete.")
audio_data = self.current_audio_buffer.getvalue()
# print(f"Total audio received: {len(audio_data)} bytes")
# print(f"Total text received: {self.current_text_response}")
await self.response_audio_queue.put(audio_data)
await self.response_text_queue.put(self.current_text_response)
self.response_event.set() # Signal that response is ready for the Gradio handler
except asyncio.CancelledError:
print("receive_responses task cancelled.")
break
except google.api_core.exceptions.Cancelled:
print("Gemini receive cancelled (likely due to interruption or end)")
# Signal completion even if cancelled externally
await self.response_audio_queue.put(self.current_audio_buffer.getvalue())
await self.response_text_queue.put(self.current_text_response + " [Receive Cancelled]")
self.response_event.set()
except Exception as e:
print(f"Error receiving responses: {e}")
traceback.print_exc()
# Signal completion with error to unblock handler
await self.response_audio_queue.put(b"") # Empty audio
await self.response_text_queue.put(f"Error receiving response: {e}")
self.response_event.set()
# Pause on significant errors to avoid spamming logs
if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)):
print("Connection error in receive_responses, pausing...")
self.connection_status = f"Receive Error: {e}" # Update status
await asyncio.sleep(5)
async def send_message_and_wait_for_response(self, text):
""" Puts text on input queue and waits for the response event """
if not self.session or self.connection_status != "Connected":
return f"Error: Not connected ({self.connection_status}).", None
await self.text_input_queue.put(text)
print("Waiting for response event...")
try:
# Wait for the event with a timeout
await asyncio.wait_for(self.response_event.wait(), timeout=60.0) # 60 second timeout
print("Response event received.")
except asyncio.TimeoutError:
print("Timeout waiting for Gemini response.")
return "Error: Timeout waiting for response.", None
except Exception as e:
print(f"Error waiting for response event: {e}")
return f"Error waiting for response: {e}", None
# Retrieve results from response queues
# Use get_nowait as the event guarantees items are present
try:
audio_data = self.response_audio_queue.get_nowait()
text_response = self.response_text_queue.get_nowait()
self.response_audio_queue.task_done()
self.response_text_queue.task_done()
except asyncio.QueueEmpty:
print("Error: Response queues were empty after event was set.")
return "Internal Error: Response queues empty.", None
except Exception as e:
print(f"Error retrieving from response queues: {e}")
return f"Internal Error: {e}", None
return text_response, audio_data
async def run_main_loop(self):
""" The main async method to establish connection and manage tasks """
global background_tasks
if not self._initialize_client_and_config():
print("Initialization failed, cannot connect.")
self.connection_status = "Connection Failed: Initialization error."
await run_coro_in_background_loop(update_status(self.connection_status))
return # Stop if client setup failed
try:
print(f"Attempting to connect to Gemini model: {MODEL}...")
self.connection_status = "Connecting..."
await run_coro_in_background_loop(update_status(self.connection_status))
# --- Connect to Gemini ---
# Use a timeout for the connection attempt itself
try:
# The actual connection happens within the context manager entry
async with asyncio.wait_for(
self.client.aio.live.connect(model=MODEL, config=self.config),
timeout=30.0 # 30 second timeout for connection
) as session:
self.session = session
self.connection_status = "Connected"
print("Session established successfully.")
await run_coro_in_background_loop(update_status(self.connection_status))
# Queue for mic/video data TO Gemini
self.out_queue = asyncio.Queue(maxsize=20)
# --- Create and manage background tasks ---
tasks = set()
tasks.add(asyncio.create_task(self.process_text_inputs(), name="process_text_inputs"))
tasks.add(asyncio.create_task(self.receive_responses(), name="receive_responses"))
if self.video_mode != "none":
tasks.add(asyncio.create_task(self.send_realtime(), name="send_realtime"))
if self.video_mode == "camera":
print("Starting camera input task...")
tasks.add(asyncio.create_task(self.get_frames(), name="get_frames"))
elif self.video_mode == "screen":
print("Starting screen capture task...")
tasks.add(asyncio.create_task(self.get_screen(), name="get_screen"))
# Option to add microphone input alongside video if needed
# print("Starting microphone input task...")
# tasks.add(asyncio.create_task(self.listen_audio(), name="listen_audio"))
background_tasks.update(tasks)
# Keep running while connected and tasks are active
# We primarily rely on receive_responses to detect session closure/errors
while self.connection_status == "Connected" and self.session:
await asyncio.sleep(0.5) # Check status periodically
print("Exiting main run loop (disconnected or error).")
except asyncio.TimeoutError:
print("CONNECTION FAILED: Timeout while trying to connect.")
self.connection_status = "Connection Failed: Timeout"
await run_coro_in_background_loop(update_status(self.connection_status))
except google.api_core.exceptions.PermissionDenied as e:
print(f"CONNECTION FAILED: Permission Denied. Check API key and permissions. {e}")
self.connection_status = "Connection Failed: Permission Denied"
await run_coro_in_background_loop(update_status(f"{self.connection_status}. Check API Key."))
except google.api_core.exceptions.InvalidArgument as e:
print(f"CONNECTION FAILED: Invalid Argument. Check model name ('{MODEL}') and config. {e}")
self.connection_status = f"Connection Failed: Invalid Argument (Model/Config?)"
await run_coro_in_background_loop(update_status(f"{self.connection_status} Details: {e}"))
except Exception as e: # Catch other potential connection errors
print(f"CONNECTION FAILED: An unexpected error occurred during connection. {e}")
traceback.print_exc()
self.connection_status = f"Connection Failed: {e}"
await run_coro_in_background_loop(update_status(self.connection_status))
except asyncio.CancelledError:
print("run_main_loop task cancelled.")
self.connection_status = "Disconnected (Cancelled)"
except Exception as e:
print(f"Error in AudioLoop run_main_loop: {e}")
traceback.print_exc()
self.connection_status = f"Runtime Error: {e}"
await run_coro_in_background_loop(update_status(self.connection_status))
finally:
print("Cleaning up audio loop resources...")
final_status = self.connection_status # Capture status before changing
if final_status == "Connected": # If loop exited cleanly but was connected
final_status = "Disconnected"
self.connection_status = "Disconnected" # Ensure status is updated
# Cancel remaining background tasks associated with *this* instance
tasks_to_cancel = list(background_tasks) # Iterate over a copy
background_tasks.clear() # Clear global set for this instance
for task in tasks_to_cancel:
if task and not task.done():
task.cancel()
print(f"Cancelled task: {task.get_name()}")
# Close PyAudio stream if open (managed by listen_audio task's finally now)
# if self.audio_stream and not self.audio_stream.is_stopped(): ... handled in listen_audio
# Reset session and client
self.session = None # Important to signal disconnection
self.client = None
self.out_queue = None # Clear queue reference
print("AudioLoop run finished.")
await run_coro_in_background_loop(update_status(final_status)) # Update Gradio status
async def disconnect(self):
"""Initiates the disconnection process."""
print("Disconnect requested.")
if self.session:
# The run_main_loop should detect the session closing or errors.
# Explicitly closing the session might be possible depending on the SDK,
# but often letting the context manager exit is the intended way.
# For now, just update status and let the main loop handle cleanup.
print("Setting status to Disconnecting...")
self.connection_status = "Disconnecting"
# Signal tasks relying on session to stop
self.session = None # This should help loops terminate
# Put None on input queue to stop text processor if waiting
await self.text_input_queue.put(None)
else:
self.connection_status = "Disconnected"
# Cleanup might happen in run_main_loop's finally block
await run_coro_in_background_loop(update_status("Disconnected"))
# --- Helper Functions ---
def start_asyncio_loop():
"""Starts the asyncio event loop in a separate thread."""
global background_loop, stop_background_loop
stop_background_loop = False
background_loop = asyncio.new_event_loop()
asyncio.set_event_loop(background_loop)
print("Background asyncio loop starting.")
try:
# Run until explicitly stopped
while not stop_background_loop:
background_loop.call_later(0.1, background_loop.stop) # Wake up periodically
background_loop.run_forever()
if stop_background_loop:
print("Stop signal received, exiting run_forever loop.")
break # Exit outer loop if stopped
# Run pending tasks before closing
print("Running pending tasks before closing loop...")
pending = asyncio.all_tasks(loop=background_loop)
if pending:
background_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception as e:
print(f"Error in background loop: {e}")
traceback.print_exc()
finally:
if background_loop.is_running():
background_loop.stop()
print("Closing background loop...")
# Give tasks a moment to finish cancelling
time.sleep(0.5)
background_loop.close()
print("Background asyncio loop stopped.")
background_loop = None # Clear global ref
def stop_asyncio_loop():
"""Signals the background asyncio loop to stop."""
global stop_background_loop, background_loop
print("Signalling background loop to stop...")
stop_background_loop = True
if background_loop and background_loop.is_running():
# This helps wake up the loop if it's idle
background_loop.call_soon_threadsafe(background_loop.stop)
async def run_coro_in_background_loop(coro):
"""Submits a coroutine to the background event loop and returns its future."""
global background_loop
if background_loop and background_loop.is_running() and not stop_background_loop:
try:
# Use run_coroutine_threadsafe for thread safety
future = asyncio.run_coroutine_threadsafe(coro, background_loop)
return future # Return the concurrent.futures.Future
except RuntimeError as e:
# Handle cases where the loop might be shutting down
print(f"Error submitting coroutine (loop shutting down?): {e}")
future = asyncio.Future()
future.set_exception(e)
return None # Indicate failure to schedule
except Exception as e:
print(f"Unexpected error submitting coroutine: {e}")
future = asyncio.Future()
future.set_exception(e)
return None
else:
print("Error: Background asyncio loop not running or stopping.")
# Create a dummy future that resolves immediately with an error?
# Or just return None to indicate failure
return None
def format_audio_for_gradio(pcm_data):
"""Converts raw PCM data to a format Gradio's Audio component can use."""
if not pcm_data:
# print("No audio data received to format.")
return None
try:
# Ensure PyAudio is initialized to get sample width
if not pya: initialize_py_audio()
if not pya: return None # Could not initialize
# Create a WAV file in memory
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pya.get_sample_size(FORMAT)) # Should be 2 for paInt16
wf.setframerate(RECEIVE_SAMPLE_RATE)
wf.writeframes(pcm_data)
wav_buffer.seek(0)
# Read the WAV data back and convert to numpy array
with wave.open(wav_buffer, 'rb') as wf_read:
n_frames = wf_read.getnframes()
data = wf_read.readframes(n_frames)
dtype = np.int16 # Based on pyaudio.paInt16
numpy_array = np.frombuffer(data, dtype=dtype)
# print(f"Formatted audio: {len(numpy_array)} samples, rate {RECEIVE_SAMPLE_RATE}")
# Return tuple for Gradio Audio: (sample_rate, numpy_array)
return (RECEIVE_SAMPLE_RATE, numpy_array)
except Exception as e:
print(f"Error formatting audio: {e}")
traceback.print_exc()
return None
def initialize_py_audio():
global pya
if pya is None:
try:
print("Initializing PyAudio...")
pya = pyaudio.PyAudio()
print("PyAudio initialized.")
return True
except Exception as e:
print(f"Failed to initialize PyAudio: {e}")
pya = None
return False
return True # Already initialized
def terminate_py_audio():
global pya
if pya:
print("Terminating PyAudio...")
try:
pya.terminate()
except Exception as e:
print(f"Error terminating PyAudio: {e}")
finally:
pya = None
print("PyAudio terminated.")
# --- Gradio Interface and Handlers ---
# Placeholder for status updates - needs to run in the background loop
async def update_status(new_status: str):
"""Coroutine to update the Gradio status component."""
# This function itself doesn't directly update Gradio.
# It relies on being scheduled and the Gradio handler returning the value.
# However, for internal logging, we print here.
print(f"Status Update (async): {new_status}")
# The actual update happens when the calling handler returns this status
# For direct async updates, you'd need Gradio's streaming features if applicable.
def handle_connect(api_key, voice_name, video_mode):
"""Handles the 'Connect' button click."""
global audio_loop_instance, background_loop, background_thread
print("\n--- Connect Button Clicked ---")
status = "Connecting..."
yield status, None, None # Initial status update
if not api_key:
yield "Error: Please enter a Gemini API key.", None, None
return
if audio_loop_instance and audio_loop_instance.connection_status not in ["Disconnected", "Initialization Error", "Connection Failed: Timeout", "Connection Failed: Permission Denied", "Connection Failed: Invalid Argument (Model/Config?)"]:
yield f"Already connected or connecting ({audio_loop_instance.connection_status}). Disconnect first.", None, None
return
# Start background loop thread if not running
if not background_thread or not background_thread.is_alive():
print("Starting background thread...")
background_thread = threading.Thread(target=start_asyncio_loop, daemon=True)
background_thread.start()
time.sleep(0.5) # Give the loop a moment to start
# Ensure PyAudio is initialized
if not initialize_py_audio():
yield "Error: Failed to initialize audio system.", None, None
return
print(f"Attempting to connect with voice: {voice_name}, video: {video_mode}")
audio_loop_instance = GradioAudioLoop(video_mode=video_mode, api_key=api_key, voice_name=voice_name)
# Run the audio loop's main logic in the background asyncio loop
connect_future = run_coro_in_background_loop(audio_loop_instance.run_main_loop())
if not connect_future:
audio_loop_instance = None # Cleanup if scheduling failed
yield "Error: Failed to schedule connection task.", None, None
return
# Don't block Gradio here. The run_main_loop will update status via update_status coroutine calls.
# We yield the initial "Connecting..." status. Subsequent updates handled async.
# We might need a short sleep/check or rely purely on async updates. Let's rely on async updates.
# yield "Connecting... Waiting for confirmation.", None, None
# Add a small delay to allow the initial connection steps to run and update status
await asyncio.sleep(1) # Use await if in async context, time.sleep otherwise? Gradio handler might be sync.
# Use time.sleep in sync Gradio handler context
time.sleep(1.5)
# The final status will be updated by the run_main_loop's finally block or error handling
# Check the instance status directly after a short wait
if audio_loop_instance:
current_status = audio_loop_instance.connection_status
yield current_status, None, None
else:
# This case shouldn't happen if scheduling worked, but as a fallback
yield "Error: Connection process failed unexpectedly.", None, None
def handle_disconnect():
"""Handles the 'Disconnect' button click."""
global audio_loop_instance
print("\n--- Disconnect Button Clicked ---")
status = "Disconnecting..."
yield status, None, None # Initial status update
if not audio_loop_instance or audio_loop_instance.connection_status == "Disconnected":
yield "Already disconnected.", None, None
return
# Schedule the disconnect coroutine
disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect())
if not disconnect_future:
yield "Error: Failed to schedule disconnection task.", None, None
return
try:
# Wait briefly for disconnect to initiate
disconnect_future.result(timeout=5.0)
status = "Disconnected"
except TimeoutError:
status = "Disconnect timeout. Check logs."
print("Timeout waiting for disconnect confirmation.")
except Exception as e:
status = f"Error during disconnect: {e}"
print(f"Error during disconnect future result: {e}")
# Clean up global instance
audio_loop_instance = None
# Optionally terminate PyAudio here or let atexit handle it
# terminate_py_audio() # Can cause issues if connect is clicked again quickly
yield status, None, None # Final status update
def handle_send_message(message):
"""Handles sending a text message."""
global audio_loop_instance
print(f"\n--- Sending Message: {message[:30]}... ---")
if not audio_loop_instance or audio_loop_instance.connection_status != "Connected":
yield "Error: Not connected. Cannot send message.", None # Update status text, no audio
return
if not message or message.strip() == "":
yield "Cannot send empty message.", None
return
# Clear previous outputs
yield "Sending message...", None # Update status, clear audio
# Schedule the send/receive task and wait for its result
response_future = run_coro_in_background_loop(
audio_loop_instance.send_message_and_wait_for_response(message)
)
if not response_future:
yield "Error: Failed to schedule message task.", None
return
text_response = "Error: No response received."
audio_output = None
try:
# Wait for the background task to complete and return results
# Adjust timeout as needed
result_text, result_audio_data = response_future.result(timeout=60.0) # Wait up to 60 secs
text_response = result_text
if result_audio_data:
print(f"Received audio data ({len(result_audio_data)} bytes), formatting...")
audio_output = format_audio_for_gradio(result_audio_data)
if audio_output is None:
print("Failed to format audio for Gradio.")
text_response += " [Audio Formatting Error]"
else:
print("No audio data received in response.")
text_response += " [No Audio Received]"
except TimeoutError:
print("Timeout waiting for response future.")
text_response = "Error: Timeout waiting for Gemini response."
# Optionally try to cancel the future if possible/needed
except Exception as e:
print(f"Error getting result from response future: {e}")
traceback.print_exc()
text_response = f"Error processing response: {e}"
print(f"Final Text Response: {text_response}")
print(f"Final Audio Output: {'Present' if audio_output else 'None'}")
yield text_response, audio_output
# --- Gradio Interface Definition ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Gemini LiveConnect TTS Interface")
gr.Markdown(f"Using Model: `{MODEL}`")
with gr.Row():
api_key_input = gr.Textbox(label="Gemini API Key", type="password", placeholder="Enter your API key")
voice_select = gr.Dropdown(label="Select Voice", choices=AVAILABLE_VOICES, value=DEFAULT_VOICE)
video_mode_select = gr.Radio(label="Video Input (Optional)", choices=["none", "camera", "screen"], value=DEFAULT_VIDEO_MODE, visible=False) # Hidden for now, focus on TTS
with gr.Row():
connect_button = gr.Button("Connect")
disconnect_button = gr.Button("Disconnect")
status_output = gr.Textbox(label="Status", value="Disconnected", interactive=False)
with gr.Column():
message_input = gr.Textbox(label="Your Message", placeholder="Type your message here...")
send_button = gr.Button("Send Message")
with gr.Column():
gr.Markdown("## Response")
response_text_output = gr.Textbox(label="Gemini Text", interactive=False)
audio_output = gr.Audio(label="Gemini Audio", type="numpy", interactive=False) # Use numpy for (rate, data) tuple
# --- Event Handlers ---
connect_button.click(
fn=handle_connect,
inputs=[api_key_input, voice_select, video_mode_select],
outputs=[status_output, response_text_output, audio_output] # Clear outputs on connect
)
disconnect_button.click(
fn=handle_disconnect,
inputs=[],
outputs=[status_output, response_text_output, audio_output] # Clear outputs on disconnect
)
send_button.click(
fn=handle_send_message,
inputs=[message_input],
outputs=[response_text_output, audio_output]
)
# Allow sending message by pressing Enter in the textbox
message_input.submit(
fn=handle_send_message,
inputs=[message_input],
outputs=[response_text_output, audio_output]
)
# --- Cleanup Function ---
def cleanup():
print("Running cleanup...")
global audio_loop_instance
# Disconnect if connected
if audio_loop_instance and audio_loop_instance.connection_status != "Disconnected":
print("Disconnecting during cleanup...")
disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect())
if disconnect_future:
try:
disconnect_future.result(timeout=5.0)
print("Disconnect successful during cleanup.")
except Exception as e:
print(f"Error during cleanup disconnect: {e}")
audio_loop_instance = None
# Signal background loop to stop
stop_asyncio_loop()
# Wait for background thread to finish
if background_thread and background_thread.is_alive():
print("Waiting for background thread to join...")
background_thread.join(timeout=5.0)
if background_thread.is_alive():
print("Warning: Background thread did not exit cleanly.")
# Terminate PyAudio
terminate_py_audio()
print("Cleanup finished.")
# Register cleanup function to run on exit
atexit.register(cleanup)
# --- Main Execution ---
if __name__ == "__main__":
# Start the background thread immediately (optional, connect can start it too)
# print("Starting background thread on launch...")
# background_thread = threading.Thread(target=start_asyncio_loop, daemon=True)
# background_thread.start()
print("Launching Gradio Interface...")
# Share=True to create a public link (remove if not needed)
demo.queue().launch(share=False)
# Keep main thread alive while Gradio is running (Gradio launch blocks)
print("Gradio Interface closed.")
# Cleanup is handled by atexit