|
|
|
|
|
import gradio as gr |
|
import asyncio |
|
import base64 |
|
import io |
|
import traceback |
|
import cv2 |
|
import pyaudio |
|
import PIL.Image |
|
import mss |
|
import google.generativeai as genai |
|
from google.generativeai import types |
|
import google.api_core.exceptions |
|
import wave |
|
import numpy as np |
|
import threading |
|
import queue |
|
import os |
|
import time |
|
import tempfile |
|
import atexit |
|
|
|
|
|
FORMAT = pyaudio.paInt16 |
|
CHANNELS = 1 |
|
SEND_SAMPLE_RATE = 16000 |
|
RECEIVE_SAMPLE_RATE = 24000 |
|
CHUNK_SIZE = 1024 |
|
MODEL = "models/gemini-2.0-flash-exp" |
|
DEFAULT_VIDEO_MODE = "none" |
|
AVAILABLE_VOICES = ["Puck", "Charon", "Kore", "Fenrir", "Aoede"] |
|
DEFAULT_VOICE = "Puck" |
|
SYSTEM_INSTRUCTION_TEXT = "Answer user ask replay same thing user say no other word explain " |
|
|
|
|
|
audio_loop_instance = None |
|
background_tasks = set() |
|
background_loop = None |
|
pya = None |
|
background_thread = None |
|
stop_background_loop = False |
|
|
|
|
|
|
|
class OriginalAudioLoop: |
|
"""Base class structure placeholder - includes relevant methods from original script""" |
|
def __init__(self, video_mode=DEFAULT_VIDEO_MODE): |
|
self.video_mode = video_mode |
|
self.out_queue = None |
|
self.session = None |
|
self.audio_stream = None |
|
|
|
def _get_frame(self, cap): |
|
ret, frame = cap.read() |
|
if not ret: return None |
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
img = PIL.Image.fromarray(frame_rgb) |
|
img.thumbnail([1024, 1024]) |
|
image_io = io.BytesIO() |
|
img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
mime_type = "image/jpeg" |
|
image_bytes = image_io.read() |
|
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} |
|
|
|
async def get_frames(self): |
|
cap = None |
|
try: |
|
print("Attempting to open camera...") |
|
cap = await asyncio.to_thread(cv2.VideoCapture, 0) |
|
if not cap.isOpened(): |
|
print("Error: Could not open camera.") |
|
|
|
await run_coro_in_background_loop(update_status("Error: Could not open camera.")) |
|
return |
|
|
|
print("Camera opened successfully.") |
|
while True: |
|
if not self.session: |
|
print("get_frames: Session closed, stopping camera task.") |
|
break |
|
|
|
frame = await asyncio.to_thread(self._get_frame, cap) |
|
if frame is None: |
|
|
|
await asyncio.sleep(0.1) |
|
continue |
|
|
|
if self.out_queue: |
|
|
|
await self.out_queue.put(frame) |
|
await asyncio.sleep(1.0) |
|
|
|
except asyncio.CancelledError: |
|
print("get_frames task cancelled.") |
|
except Exception as e: |
|
print(f"Error in get_frames: {e}") |
|
await run_coro_in_background_loop(update_status(f"Camera Error: {e}")) |
|
finally: |
|
if cap and cap.isOpened(): |
|
print("Releasing camera.") |
|
await asyncio.to_thread(cap.release) |
|
print("Camera task finished.") |
|
|
|
|
|
def _get_screen(self): |
|
try: |
|
with mss.mss() as sct: |
|
|
|
monitor = sct.monitors[1] |
|
sct_img = sct.grab(monitor) |
|
img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") |
|
|
|
image_io = io.BytesIO() |
|
img.thumbnail([1024, 1024]) |
|
img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
|
|
mime_type = "image/jpeg" |
|
image_bytes = image_io.read() |
|
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} |
|
except IndexError: |
|
print("Error capturing screen: Could not find monitor at index 1. Trying index 0.") |
|
try: |
|
with mss.mss() as sct: |
|
monitor = sct.monitors[0] |
|
sct_img = sct.grab(monitor) |
|
img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") |
|
image_io = io.BytesIO() |
|
img.thumbnail([1024, 1024]) |
|
img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
mime_type = "image/jpeg" |
|
image_bytes = image_io.read() |
|
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} |
|
except Exception as e_fallback: |
|
print(f"Error capturing screen (fallback monitor 0): {e_fallback}") |
|
return None |
|
except Exception as e: |
|
print(f"Error capturing screen: {e}") |
|
return None |
|
|
|
async def get_screen(self): |
|
while True: |
|
if not self.session: |
|
print("get_screen: Session closed, stopping screen task.") |
|
break |
|
|
|
frame = await asyncio.to_thread(self._get_screen) |
|
if frame is None: |
|
print("Warning: Failed to capture screen.") |
|
await asyncio.sleep(1.0) |
|
continue |
|
|
|
if self.out_queue: |
|
|
|
await self.out_queue.put(frame) |
|
|
|
await asyncio.sleep(1.0) |
|
|
|
async def send_realtime(self): |
|
"""Sends microphone audio or video frames from the out_queue to Gemini.""" |
|
while True: |
|
if not self.session or not self.out_queue: |
|
|
|
await asyncio.sleep(0.1) |
|
if not self.session: |
|
print("send_realtime: Session closed, stopping task.") |
|
break |
|
continue |
|
|
|
try: |
|
msg = await asyncio.wait_for(self.out_queue.get(), timeout=1.0) |
|
if self.session: |
|
|
|
await self.session.send(input=msg) |
|
self.out_queue.task_done() |
|
except asyncio.TimeoutError: |
|
|
|
continue |
|
except asyncio.CancelledError: |
|
print("send_realtime task cancelled.") |
|
break |
|
except Exception as e: |
|
print(f"Error in send_realtime: {e}") |
|
await run_coro_in_background_loop(update_status(f"Send Error: {e}")) |
|
|
|
if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)): |
|
print("Connection error in send_realtime, pausing...") |
|
await asyncio.sleep(5) |
|
|
|
|
|
async def listen_audio(self): |
|
"""Listens to microphone and puts audio chunks onto the out_queue.""" |
|
global pya |
|
if not pya: |
|
print("Error: PyAudio not initialized in listen_audio.") |
|
await run_coro_in_background_loop(update_status("Error: Audio system not ready.")) |
|
return |
|
|
|
mic_info = None |
|
stream = None |
|
try: |
|
print("Attempting to open microphone...") |
|
mic_info = await asyncio.to_thread(pya.get_default_input_device_info) |
|
stream = await asyncio.to_thread( |
|
pya.open, |
|
format=FORMAT, |
|
channels=CHANNELS, |
|
rate=SEND_SAMPLE_RATE, |
|
input=True, |
|
input_device_index=mic_info["index"], |
|
frames_per_buffer=CHUNK_SIZE, |
|
) |
|
self.audio_stream = stream |
|
print("Microphone stream opened.") |
|
if __debug__: |
|
kwargs = {"exception_on_overflow": False} |
|
else: |
|
kwargs = {} |
|
|
|
while True: |
|
if not self.session: |
|
print("listen_audio: Session closed, stopping microphone task.") |
|
break |
|
try: |
|
|
|
data = await asyncio.to_thread(stream.read, CHUNK_SIZE, **kwargs) |
|
if self.out_queue: |
|
|
|
await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) |
|
except IOError as e: |
|
|
|
|
|
await asyncio.sleep(0.05) |
|
except asyncio.CancelledError: |
|
print("listen_audio task cancelled.") |
|
break |
|
|
|
except OSError as e: |
|
print(f"Error opening microphone: {e}. Is a microphone connected and accessible?") |
|
await run_coro_in_background_loop(update_status(f"Mic Error: {e}")) |
|
except Exception as e: |
|
print(f"Error in listen_audio: {e}") |
|
traceback.print_exc() |
|
await run_coro_in_background_loop(update_status(f"Mic Error: {e}")) |
|
finally: |
|
if stream: |
|
print("Stopping and closing microphone stream.") |
|
await asyncio.to_thread(stream.stop_stream) |
|
await asyncio.to_thread(stream.close) |
|
self.audio_stream = None |
|
print("Microphone stream closed.") |
|
|
|
|
|
|
|
class GradioAudioLoop(OriginalAudioLoop): |
|
def __init__(self, video_mode=DEFAULT_VIDEO_MODE, api_key=None, voice_name=DEFAULT_VOICE): |
|
super().__init__(video_mode) |
|
self.api_key = api_key |
|
self.voice_name = voice_name |
|
self.client = None |
|
self.config = None |
|
self.connection_status = "Disconnected" |
|
|
|
|
|
self.text_input_queue = asyncio.Queue() |
|
self.response_text_queue = asyncio.Queue() |
|
self.response_audio_queue = asyncio.Queue() |
|
self.response_event = asyncio.Event() |
|
|
|
|
|
self.current_audio_buffer = io.BytesIO() |
|
self.current_text_response = "" |
|
|
|
def _initialize_client_and_config(self): |
|
"""Initialize Gemini client and configuration.""" |
|
if not self.api_key: |
|
raise ValueError("API key is not set.") |
|
try: |
|
|
|
|
|
|
|
api_key_to_use = os.getenv("GEMINI_API_KEY", self.api_key) |
|
if not api_key_to_use: |
|
raise ValueError("No API key provided or found in GEMINI_API_KEY environment variable.") |
|
|
|
|
|
print("Initializing Gemini Client...") |
|
self.client = genai.Client(api_key=api_key_to_use) |
|
|
|
print(f"Setting up LiveConnectConfig with voice: {self.voice_name}") |
|
self.config = types.LiveConnectConfig( |
|
response_modalities=["audio", "text"], |
|
speech_config=types.SpeechConfig( |
|
voice_config=types.VoiceConfig( |
|
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=self.voice_name) |
|
) |
|
), |
|
system_instruction=types.Content( |
|
parts=[types.Part.from_text(text=SYSTEM_INSTRUCTION_TEXT)], |
|
role="user" |
|
), |
|
) |
|
print("Gemini client and config initialized successfully.") |
|
self.connection_status = "Initialized" |
|
return True |
|
except Exception as e: |
|
print(f"Error initializing Gemini client: {e}") |
|
self.client = None |
|
self.config = None |
|
self.connection_status = f"Initialization Error: {e}" |
|
return False |
|
|
|
|
|
async def process_text_inputs(self): |
|
""" Task to wait for text input from Gradio and send it to Gemini """ |
|
while True: |
|
try: |
|
|
|
text_to_send = await self.text_input_queue.get() |
|
|
|
if text_to_send is None: |
|
print("Stopping text input processing.") |
|
break |
|
|
|
if self.session and self.connection_status == "Connected": |
|
print(f"Sending text to Gemini: {text_to_send[:50]}...") |
|
|
|
self.current_audio_buffer = io.BytesIO() |
|
self.current_text_response = "" |
|
self.response_event.clear() |
|
|
|
await self.session.send(input=text_to_send or ".", end_of_turn=True) |
|
print("Text sent, waiting for response...") |
|
else: |
|
print(f"Warning: Cannot send text. Session not active or status is {self.connection_status}.") |
|
|
|
await self.response_text_queue.put(f"Error: Not connected or connection issue ({self.connection_status}). Cannot send message.") |
|
await self.response_audio_queue.put(b"") |
|
self.response_event.set() |
|
|
|
self.text_input_queue.task_done() |
|
|
|
except asyncio.CancelledError: |
|
print("process_text_inputs task cancelled.") |
|
break |
|
except Exception as e: |
|
print(f"Error in process_text_inputs: {e}") |
|
|
|
await self.response_text_queue.put(f"Error sending message: {e}") |
|
await self.response_audio_queue.put(b"") |
|
self.response_event.set() |
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
async def receive_responses(self): |
|
""" Task to receive responses (audio/text) from Gemini """ |
|
while True: |
|
if not self.session or self.connection_status != "Connected": |
|
|
|
await asyncio.sleep(0.2) |
|
if not self.session: |
|
print("receive_responses: Session closed, stopping task.") |
|
break |
|
continue |
|
|
|
try: |
|
|
|
turn = self.session.receive() |
|
|
|
async for response in turn: |
|
if data := response.data: |
|
|
|
self.current_audio_buffer.write(data) |
|
if text := response.text: |
|
|
|
self.current_text_response += text |
|
|
|
|
|
|
|
audio_data = self.current_audio_buffer.getvalue() |
|
|
|
|
|
|
|
await self.response_audio_queue.put(audio_data) |
|
await self.response_text_queue.put(self.current_text_response) |
|
self.response_event.set() |
|
|
|
except asyncio.CancelledError: |
|
print("receive_responses task cancelled.") |
|
break |
|
except google.api_core.exceptions.Cancelled: |
|
print("Gemini receive cancelled (likely due to interruption or end)") |
|
|
|
await self.response_audio_queue.put(self.current_audio_buffer.getvalue()) |
|
await self.response_text_queue.put(self.current_text_response + " [Receive Cancelled]") |
|
self.response_event.set() |
|
except Exception as e: |
|
print(f"Error receiving responses: {e}") |
|
traceback.print_exc() |
|
|
|
await self.response_audio_queue.put(b"") |
|
await self.response_text_queue.put(f"Error receiving response: {e}") |
|
self.response_event.set() |
|
|
|
if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)): |
|
print("Connection error in receive_responses, pausing...") |
|
self.connection_status = f"Receive Error: {e}" |
|
await asyncio.sleep(5) |
|
|
|
|
|
async def send_message_and_wait_for_response(self, text): |
|
""" Puts text on input queue and waits for the response event """ |
|
if not self.session or self.connection_status != "Connected": |
|
return f"Error: Not connected ({self.connection_status}).", None |
|
|
|
await self.text_input_queue.put(text) |
|
print("Waiting for response event...") |
|
try: |
|
|
|
await asyncio.wait_for(self.response_event.wait(), timeout=60.0) |
|
print("Response event received.") |
|
except asyncio.TimeoutError: |
|
print("Timeout waiting for Gemini response.") |
|
return "Error: Timeout waiting for response.", None |
|
except Exception as e: |
|
print(f"Error waiting for response event: {e}") |
|
return f"Error waiting for response: {e}", None |
|
|
|
|
|
|
|
try: |
|
audio_data = self.response_audio_queue.get_nowait() |
|
text_response = self.response_text_queue.get_nowait() |
|
self.response_audio_queue.task_done() |
|
self.response_text_queue.task_done() |
|
except asyncio.QueueEmpty: |
|
print("Error: Response queues were empty after event was set.") |
|
return "Internal Error: Response queues empty.", None |
|
except Exception as e: |
|
print(f"Error retrieving from response queues: {e}") |
|
return f"Internal Error: {e}", None |
|
|
|
|
|
return text_response, audio_data |
|
|
|
async def run_main_loop(self): |
|
""" The main async method to establish connection and manage tasks """ |
|
global background_tasks |
|
if not self._initialize_client_and_config(): |
|
print("Initialization failed, cannot connect.") |
|
self.connection_status = "Connection Failed: Initialization error." |
|
await run_coro_in_background_loop(update_status(self.connection_status)) |
|
return |
|
|
|
try: |
|
print(f"Attempting to connect to Gemini model: {MODEL}...") |
|
self.connection_status = "Connecting..." |
|
await run_coro_in_background_loop(update_status(self.connection_status)) |
|
|
|
|
|
|
|
try: |
|
|
|
async with asyncio.wait_for( |
|
self.client.aio.live.connect(model=MODEL, config=self.config), |
|
timeout=30.0 |
|
) as session: |
|
self.session = session |
|
self.connection_status = "Connected" |
|
print("Session established successfully.") |
|
await run_coro_in_background_loop(update_status(self.connection_status)) |
|
|
|
|
|
self.out_queue = asyncio.Queue(maxsize=20) |
|
|
|
|
|
tasks = set() |
|
tasks.add(asyncio.create_task(self.process_text_inputs(), name="process_text_inputs")) |
|
tasks.add(asyncio.create_task(self.receive_responses(), name="receive_responses")) |
|
|
|
if self.video_mode != "none": |
|
tasks.add(asyncio.create_task(self.send_realtime(), name="send_realtime")) |
|
if self.video_mode == "camera": |
|
print("Starting camera input task...") |
|
tasks.add(asyncio.create_task(self.get_frames(), name="get_frames")) |
|
elif self.video_mode == "screen": |
|
print("Starting screen capture task...") |
|
tasks.add(asyncio.create_task(self.get_screen(), name="get_screen")) |
|
|
|
|
|
|
|
|
|
background_tasks.update(tasks) |
|
|
|
|
|
|
|
while self.connection_status == "Connected" and self.session: |
|
await asyncio.sleep(0.5) |
|
|
|
print("Exiting main run loop (disconnected or error).") |
|
|
|
except asyncio.TimeoutError: |
|
print("CONNECTION FAILED: Timeout while trying to connect.") |
|
self.connection_status = "Connection Failed: Timeout" |
|
await run_coro_in_background_loop(update_status(self.connection_status)) |
|
except google.api_core.exceptions.PermissionDenied as e: |
|
print(f"CONNECTION FAILED: Permission Denied. Check API key and permissions. {e}") |
|
self.connection_status = "Connection Failed: Permission Denied" |
|
await run_coro_in_background_loop(update_status(f"{self.connection_status}. Check API Key.")) |
|
except google.api_core.exceptions.InvalidArgument as e: |
|
print(f"CONNECTION FAILED: Invalid Argument. Check model name ('{MODEL}') and config. {e}") |
|
self.connection_status = f"Connection Failed: Invalid Argument (Model/Config?)" |
|
await run_coro_in_background_loop(update_status(f"{self.connection_status} Details: {e}")) |
|
except Exception as e: |
|
print(f"CONNECTION FAILED: An unexpected error occurred during connection. {e}") |
|
traceback.print_exc() |
|
self.connection_status = f"Connection Failed: {e}" |
|
await run_coro_in_background_loop(update_status(self.connection_status)) |
|
|
|
except asyncio.CancelledError: |
|
print("run_main_loop task cancelled.") |
|
self.connection_status = "Disconnected (Cancelled)" |
|
except Exception as e: |
|
print(f"Error in AudioLoop run_main_loop: {e}") |
|
traceback.print_exc() |
|
self.connection_status = f"Runtime Error: {e}" |
|
await run_coro_in_background_loop(update_status(self.connection_status)) |
|
finally: |
|
print("Cleaning up audio loop resources...") |
|
final_status = self.connection_status |
|
if final_status == "Connected": |
|
final_status = "Disconnected" |
|
self.connection_status = "Disconnected" |
|
|
|
|
|
tasks_to_cancel = list(background_tasks) |
|
background_tasks.clear() |
|
for task in tasks_to_cancel: |
|
if task and not task.done(): |
|
task.cancel() |
|
print(f"Cancelled task: {task.get_name()}") |
|
|
|
|
|
|
|
|
|
|
|
self.session = None |
|
self.client = None |
|
self.out_queue = None |
|
|
|
print("AudioLoop run finished.") |
|
await run_coro_in_background_loop(update_status(final_status)) |
|
|
|
async def disconnect(self): |
|
"""Initiates the disconnection process.""" |
|
print("Disconnect requested.") |
|
if self.session: |
|
|
|
|
|
|
|
|
|
print("Setting status to Disconnecting...") |
|
self.connection_status = "Disconnecting" |
|
|
|
self.session = None |
|
|
|
await self.text_input_queue.put(None) |
|
else: |
|
self.connection_status = "Disconnected" |
|
|
|
|
|
await run_coro_in_background_loop(update_status("Disconnected")) |
|
|
|
|
|
|
|
|
|
def start_asyncio_loop(): |
|
"""Starts the asyncio event loop in a separate thread.""" |
|
global background_loop, stop_background_loop |
|
stop_background_loop = False |
|
background_loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(background_loop) |
|
print("Background asyncio loop starting.") |
|
try: |
|
|
|
while not stop_background_loop: |
|
background_loop.call_later(0.1, background_loop.stop) |
|
background_loop.run_forever() |
|
if stop_background_loop: |
|
print("Stop signal received, exiting run_forever loop.") |
|
break |
|
|
|
|
|
print("Running pending tasks before closing loop...") |
|
pending = asyncio.all_tasks(loop=background_loop) |
|
if pending: |
|
background_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) |
|
|
|
except Exception as e: |
|
print(f"Error in background loop: {e}") |
|
traceback.print_exc() |
|
finally: |
|
if background_loop.is_running(): |
|
background_loop.stop() |
|
print("Closing background loop...") |
|
|
|
time.sleep(0.5) |
|
background_loop.close() |
|
print("Background asyncio loop stopped.") |
|
background_loop = None |
|
|
|
def stop_asyncio_loop(): |
|
"""Signals the background asyncio loop to stop.""" |
|
global stop_background_loop, background_loop |
|
print("Signalling background loop to stop...") |
|
stop_background_loop = True |
|
if background_loop and background_loop.is_running(): |
|
|
|
background_loop.call_soon_threadsafe(background_loop.stop) |
|
|
|
async def run_coro_in_background_loop(coro): |
|
"""Submits a coroutine to the background event loop and returns its future.""" |
|
global background_loop |
|
if background_loop and background_loop.is_running() and not stop_background_loop: |
|
try: |
|
|
|
future = asyncio.run_coroutine_threadsafe(coro, background_loop) |
|
return future |
|
except RuntimeError as e: |
|
|
|
print(f"Error submitting coroutine (loop shutting down?): {e}") |
|
future = asyncio.Future() |
|
future.set_exception(e) |
|
return None |
|
except Exception as e: |
|
print(f"Unexpected error submitting coroutine: {e}") |
|
future = asyncio.Future() |
|
future.set_exception(e) |
|
return None |
|
else: |
|
print("Error: Background asyncio loop not running or stopping.") |
|
|
|
|
|
return None |
|
|
|
def format_audio_for_gradio(pcm_data): |
|
"""Converts raw PCM data to a format Gradio's Audio component can use.""" |
|
if not pcm_data: |
|
|
|
return None |
|
try: |
|
|
|
if not pya: initialize_py_audio() |
|
if not pya: return None |
|
|
|
|
|
wav_buffer = io.BytesIO() |
|
with wave.open(wav_buffer, 'wb') as wf: |
|
wf.setnchannels(CHANNELS) |
|
wf.setsampwidth(pya.get_sample_size(FORMAT)) |
|
wf.setframerate(RECEIVE_SAMPLE_RATE) |
|
wf.writeframes(pcm_data) |
|
wav_buffer.seek(0) |
|
|
|
|
|
with wave.open(wav_buffer, 'rb') as wf_read: |
|
n_frames = wf_read.getnframes() |
|
data = wf_read.readframes(n_frames) |
|
dtype = np.int16 |
|
numpy_array = np.frombuffer(data, dtype=dtype) |
|
|
|
|
|
|
|
return (RECEIVE_SAMPLE_RATE, numpy_array) |
|
except Exception as e: |
|
print(f"Error formatting audio: {e}") |
|
traceback.print_exc() |
|
return None |
|
|
|
def initialize_py_audio(): |
|
global pya |
|
if pya is None: |
|
try: |
|
print("Initializing PyAudio...") |
|
pya = pyaudio.PyAudio() |
|
print("PyAudio initialized.") |
|
return True |
|
except Exception as e: |
|
print(f"Failed to initialize PyAudio: {e}") |
|
pya = None |
|
return False |
|
return True |
|
|
|
def terminate_py_audio(): |
|
global pya |
|
if pya: |
|
print("Terminating PyAudio...") |
|
try: |
|
pya.terminate() |
|
except Exception as e: |
|
print(f"Error terminating PyAudio: {e}") |
|
finally: |
|
pya = None |
|
print("PyAudio terminated.") |
|
|
|
|
|
|
|
|
|
|
|
async def update_status(new_status: str): |
|
"""Coroutine to update the Gradio status component.""" |
|
|
|
|
|
|
|
print(f"Status Update (async): {new_status}") |
|
|
|
|
|
|
|
|
|
def handle_connect(api_key, voice_name, video_mode): |
|
"""Handles the 'Connect' button click.""" |
|
global audio_loop_instance, background_loop, background_thread |
|
print("\n--- Connect Button Clicked ---") |
|
status = "Connecting..." |
|
yield status, None, None |
|
|
|
if not api_key: |
|
yield "Error: Please enter a Gemini API key.", None, None |
|
return |
|
if audio_loop_instance and audio_loop_instance.connection_status not in ["Disconnected", "Initialization Error", "Connection Failed: Timeout", "Connection Failed: Permission Denied", "Connection Failed: Invalid Argument (Model/Config?)"]: |
|
yield f"Already connected or connecting ({audio_loop_instance.connection_status}). Disconnect first.", None, None |
|
return |
|
|
|
|
|
if not background_thread or not background_thread.is_alive(): |
|
print("Starting background thread...") |
|
background_thread = threading.Thread(target=start_asyncio_loop, daemon=True) |
|
background_thread.start() |
|
time.sleep(0.5) |
|
|
|
|
|
if not initialize_py_audio(): |
|
yield "Error: Failed to initialize audio system.", None, None |
|
return |
|
|
|
print(f"Attempting to connect with voice: {voice_name}, video: {video_mode}") |
|
audio_loop_instance = GradioAudioLoop(video_mode=video_mode, api_key=api_key, voice_name=voice_name) |
|
|
|
|
|
connect_future = run_coro_in_background_loop(audio_loop_instance.run_main_loop()) |
|
if not connect_future: |
|
audio_loop_instance = None |
|
yield "Error: Failed to schedule connection task.", None, None |
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
time.sleep(1.5) |
|
|
|
|
|
|
|
if audio_loop_instance: |
|
current_status = audio_loop_instance.connection_status |
|
yield current_status, None, None |
|
else: |
|
|
|
yield "Error: Connection process failed unexpectedly.", None, None |
|
|
|
|
|
def handle_disconnect(): |
|
"""Handles the 'Disconnect' button click.""" |
|
global audio_loop_instance |
|
print("\n--- Disconnect Button Clicked ---") |
|
status = "Disconnecting..." |
|
yield status, None, None |
|
|
|
if not audio_loop_instance or audio_loop_instance.connection_status == "Disconnected": |
|
yield "Already disconnected.", None, None |
|
return |
|
|
|
|
|
disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect()) |
|
if not disconnect_future: |
|
yield "Error: Failed to schedule disconnection task.", None, None |
|
return |
|
|
|
try: |
|
|
|
disconnect_future.result(timeout=5.0) |
|
status = "Disconnected" |
|
except TimeoutError: |
|
status = "Disconnect timeout. Check logs." |
|
print("Timeout waiting for disconnect confirmation.") |
|
except Exception as e: |
|
status = f"Error during disconnect: {e}" |
|
print(f"Error during disconnect future result: {e}") |
|
|
|
|
|
audio_loop_instance = None |
|
|
|
|
|
|
|
|
|
yield status, None, None |
|
|
|
|
|
def handle_send_message(message): |
|
"""Handles sending a text message.""" |
|
global audio_loop_instance |
|
print(f"\n--- Sending Message: {message[:30]}... ---") |
|
|
|
if not audio_loop_instance or audio_loop_instance.connection_status != "Connected": |
|
yield "Error: Not connected. Cannot send message.", None |
|
return |
|
|
|
if not message or message.strip() == "": |
|
yield "Cannot send empty message.", None |
|
return |
|
|
|
|
|
yield "Sending message...", None |
|
|
|
|
|
response_future = run_coro_in_background_loop( |
|
audio_loop_instance.send_message_and_wait_for_response(message) |
|
) |
|
|
|
if not response_future: |
|
yield "Error: Failed to schedule message task.", None |
|
return |
|
|
|
text_response = "Error: No response received." |
|
audio_output = None |
|
try: |
|
|
|
|
|
result_text, result_audio_data = response_future.result(timeout=60.0) |
|
|
|
text_response = result_text |
|
if result_audio_data: |
|
print(f"Received audio data ({len(result_audio_data)} bytes), formatting...") |
|
audio_output = format_audio_for_gradio(result_audio_data) |
|
if audio_output is None: |
|
print("Failed to format audio for Gradio.") |
|
text_response += " [Audio Formatting Error]" |
|
else: |
|
print("No audio data received in response.") |
|
text_response += " [No Audio Received]" |
|
|
|
|
|
except TimeoutError: |
|
print("Timeout waiting for response future.") |
|
text_response = "Error: Timeout waiting for Gemini response." |
|
|
|
except Exception as e: |
|
print(f"Error getting result from response future: {e}") |
|
traceback.print_exc() |
|
text_response = f"Error processing response: {e}" |
|
|
|
print(f"Final Text Response: {text_response}") |
|
print(f"Final Audio Output: {'Present' if audio_output else 'None'}") |
|
yield text_response, audio_output |
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# Gemini LiveConnect TTS Interface") |
|
gr.Markdown(f"Using Model: `{MODEL}`") |
|
|
|
with gr.Row(): |
|
api_key_input = gr.Textbox(label="Gemini API Key", type="password", placeholder="Enter your API key") |
|
voice_select = gr.Dropdown(label="Select Voice", choices=AVAILABLE_VOICES, value=DEFAULT_VOICE) |
|
video_mode_select = gr.Radio(label="Video Input (Optional)", choices=["none", "camera", "screen"], value=DEFAULT_VIDEO_MODE, visible=False) |
|
|
|
with gr.Row(): |
|
connect_button = gr.Button("Connect") |
|
disconnect_button = gr.Button("Disconnect") |
|
|
|
status_output = gr.Textbox(label="Status", value="Disconnected", interactive=False) |
|
|
|
with gr.Column(): |
|
message_input = gr.Textbox(label="Your Message", placeholder="Type your message here...") |
|
send_button = gr.Button("Send Message") |
|
|
|
with gr.Column(): |
|
gr.Markdown("## Response") |
|
response_text_output = gr.Textbox(label="Gemini Text", interactive=False) |
|
audio_output = gr.Audio(label="Gemini Audio", type="numpy", interactive=False) |
|
|
|
|
|
connect_button.click( |
|
fn=handle_connect, |
|
inputs=[api_key_input, voice_select, video_mode_select], |
|
outputs=[status_output, response_text_output, audio_output] |
|
) |
|
disconnect_button.click( |
|
fn=handle_disconnect, |
|
inputs=[], |
|
outputs=[status_output, response_text_output, audio_output] |
|
) |
|
send_button.click( |
|
fn=handle_send_message, |
|
inputs=[message_input], |
|
outputs=[response_text_output, audio_output] |
|
) |
|
|
|
message_input.submit( |
|
fn=handle_send_message, |
|
inputs=[message_input], |
|
outputs=[response_text_output, audio_output] |
|
) |
|
|
|
|
|
|
|
def cleanup(): |
|
print("Running cleanup...") |
|
global audio_loop_instance |
|
|
|
|
|
if audio_loop_instance and audio_loop_instance.connection_status != "Disconnected": |
|
print("Disconnecting during cleanup...") |
|
disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect()) |
|
if disconnect_future: |
|
try: |
|
disconnect_future.result(timeout=5.0) |
|
print("Disconnect successful during cleanup.") |
|
except Exception as e: |
|
print(f"Error during cleanup disconnect: {e}") |
|
audio_loop_instance = None |
|
|
|
|
|
stop_asyncio_loop() |
|
|
|
|
|
if background_thread and background_thread.is_alive(): |
|
print("Waiting for background thread to join...") |
|
background_thread.join(timeout=5.0) |
|
if background_thread.is_alive(): |
|
print("Warning: Background thread did not exit cleanly.") |
|
|
|
|
|
terminate_py_audio() |
|
print("Cleanup finished.") |
|
|
|
|
|
atexit.register(cleanup) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
print("Launching Gradio Interface...") |
|
|
|
demo.queue().launch(share=False) |
|
|
|
|
|
print("Gradio Interface closed.") |
|
|