Athspi commited on
Commit
bdfd7a5
·
verified ·
1 Parent(s): fba1da6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +911 -235
app.py CHANGED
@@ -1,294 +1,970 @@
1
- ## Documentation
2
- # Quickstart: https://github.com/google-gemini/cookbook/blob/main/quickstarts/Get_started_LiveAPI.py
3
- #
4
- ## Setup
5
- #
6
- # To install the dependencies for this script, run:
7
- #
8
- # ```
9
- # pip install google-genai opencv-python pyaudio pillow mss
10
- # ```
11
-
12
 
 
13
  import asyncio
14
  import base64
15
  import io
16
  import traceback
17
-
18
  import cv2
19
  import pyaudio
20
  import PIL.Image
21
  import mss
22
-
23
- import argparse
24
-
25
- from google import genai
26
- from google.genai import types
27
-
28
- import gradio as gr
29
-
 
 
 
 
 
30
  FORMAT = pyaudio.paInt16
31
  CHANNELS = 1
32
  SEND_SAMPLE_RATE = 16000
33
- RECEIVE_SAMPLE_RATE = 24000
34
  CHUNK_SIZE = 1024
35
-
36
- MODEL = "models/gemini-2.0-flash-exp"
37
-
38
- DEFAULT_MODE = "camera"
39
-
40
- # Replace with your actual API key
41
- # client = genai.Client(http_options={"api_version": "v1alpha"}, api_key="YOUR_API_KEY")
42
- client = genai.Client(http_options={"api_version": "v1alpha"}, api_key="GEMINI_API_KEY")
43
-
44
- # While Gemini 2.0 Flash is in experimental preview mode, only one of AUDIO or
45
- # TEXT may be passed here.
46
- CONFIG = types.LiveConnectConfig(
47
- response_modalities=[
48
- "audio",
49
- ],
50
- speech_config=types.SpeechConfig(
51
- voice_config=types.VoiceConfig(
52
- prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Puck")
53
- )
54
- ),
55
- system_instruction=types.Content(
56
- parts=[types.Part.from_text(text="Answer user ask replay same thing user say no other word explain ")],
57
- role="user"
58
- ),
59
- )
60
-
61
- pya = pyaudio.PyAudio()
62
-
63
-
64
- class AudioLoop:
65
- def __init__(self, video_mode=DEFAULT_MODE):
66
  self.video_mode = video_mode
67
-
68
- self.audio_in_queue = None
69
- self.out_queue = None
70
-
71
  self.session = None
72
-
73
- self.send_text_task = None
74
- self.receive_audio_task = None
75
- self.play_audio_task = None
76
-
77
- async def send_text(self, text):
78
- # while True:
79
- # text = await asyncio.to_thread(
80
- # input,
81
- # "message > ",
82
- # )
83
- # if text.lower() == "q":
84
- # break
85
- await self.session.send(input=text or ".", end_of_turn=True)
86
 
87
  def _get_frame(self, cap):
88
- # Read the frameq
89
  ret, frame = cap.read()
90
- # Check if the frame was read successfully
91
- if not ret:
92
- return None
93
- # Fix: Convert BGR to RGB color space
94
- # OpenCV captures in BGR but PIL expects RGB format
95
- # This prevents the blue tint in the video feed
96
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
97
- img = PIL.Image.fromarray(frame_rgb) # Now using RGB frame
98
  img.thumbnail([1024, 1024])
99
-
100
  image_io = io.BytesIO()
101
  img.save(image_io, format="jpeg")
102
  image_io.seek(0)
103
-
104
  mime_type = "image/jpeg"
105
  image_bytes = image_io.read()
106
  return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
107
 
108
  async def get_frames(self):
109
- # This takes about a second, and will block the whole program
110
- # causing the audio pipeline to overflow if you don't to_thread it.
111
- cap = await asyncio.to_thread(
112
- cv2.VideoCapture, 0
113
- ) # 0 represents the default camera
114
-
115
- while True:
116
- frame = await asyncio.to_thread(self._get_frame, cap)
117
- if frame is None:
118
- break
119
-
120
- await asyncio.sleep(1.0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
- await self.out_queue.put(frame)
 
 
 
 
 
 
 
 
 
123
 
124
- # Release the VideoCapture object
125
- cap.release()
126
 
127
  def _get_screen(self):
128
- sct = mss.mss()
129
- monitor = sct.monitors[0]
130
-
131
- i = sct.grab(monitor)
132
-
133
- mime_type = "image/jpeg"
134
- image_bytes = mss.tools.to_png(i.rgb, i.size)
135
- img = PIL.Image.open(io.BytesIO(image_bytes))
136
-
137
- image_io = io.BytesIO()
138
- img.save(image_io, format="jpeg")
139
- image_io.seek(0)
140
-
141
- image_bytes = image_io.read()
142
- return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
  async def get_screen(self):
145
-
146
  while True:
 
 
 
 
147
  frame = await asyncio.to_thread(self._get_screen)
148
  if frame is None:
149
- break
 
 
150
 
151
- await asyncio.sleep(1.0)
 
 
152
 
153
- await self.out_queue.put(frame)
154
 
155
  async def send_realtime(self):
 
156
  while True:
157
- msg = await self.out_queue.get()
158
- await self.session.send(input=msg)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
 
160
- async def listen_audio(self):
161
- mic_info = pya.get_default_input_device_info()
162
- self.audio_stream = await asyncio.to_thread(
163
- pya.open,
164
- format=FORMAT,
165
- channels=CHANNELS,
166
- rate=SEND_SAMPLE_RATE,
167
- input=True,
168
- input_device_index=mic_info["index"],
169
- frames_per_buffer=CHUNK_SIZE,
170
- )
171
- if __debug__:
172
- kwargs = {"exception_on_overflow": False}
173
- else:
174
- kwargs = {}
175
- while True:
176
- data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, **kwargs)
177
- await self.out_queue.put({"data": data, "mime_type": "audio/pcm"})
178
 
179
- async def receive_audio(self):
180
- "Background task to reads from the websocket and write pcm chunks to the output queue"
181
- while True:
182
- turn = self.session.receive()
183
- async for response in turn:
184
- if data := response.data:
185
- self.audio_in_queue.put_nowait(data)
186
- continue
187
- if text := response.text:
188
- # print(text, end="") # Don't print to console, return it
189
- return text # Return the text to Gradio
190
-
191
- # If you interrupt the model, it sends a turn_complete.
192
- # For interruptions to work, we need to stop playback.
193
- # So empty out the audio queue because it may have loaded
194
- # much more audio than has played yet.
195
- while not self.audio_in_queue.empty():
196
- self.audio_in_queue.get_nowait()
197
-
198
- async def play_audio(self):
199
- stream = await asyncio.to_thread(
200
- pya.open,
201
- format=FORMAT,
202
- channels=CHANNELS,
203
- rate=RECEIVE_SAMPLE_RATE,
204
- output=True,
205
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
206
  while True:
207
- bytestream = await self.audio_in_queue.get()
208
- await asyncio.to_thread(stream.write, bytestream)
209
-
210
- async def run(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
211
  try:
212
- async with (
213
- client.aio.live.connect(model=MODEL, config=CONFIG) as session,
214
- asyncio.TaskGroup() as tg,
215
- ):
216
- self.session = session
217
-
218
- self.audio_in_queue = asyncio.Queue()
219
- self.out_queue = asyncio.Queue(maxsize=5)
220
-
221
- # send_text_task = tg.create_task(self.send_text()) #No text task anymore.
222
- tg.create_task(self.send_realtime())
223
- tg.create_task(self.listen_audio())
224
- if self.video_mode == "camera":
225
- tg.create_task(self.get_frames())
226
- elif self.video_mode == "screen":
227
- tg.create_task(self.get_screen())
228
-
229
- tg.create_task(self.receive_audio())
230
- tg.create_task(self.play_audio())
231
-
232
- # await send_text_task
233
- # raise asyncio.CancelledError("User requested exit")
234
- return await self.receive_audio() #return audio transcript result
235
-
236
- except asyncio.CancelledError:
237
- pass
238
- except ExceptionGroup as EG:
239
- self.audio_stream.close()
240
- traceback.print_exception(EG)
241
  except Exception as e:
242
- traceback.print_exc() # Print the traceback for debugging
243
- return f"Error: {str(e)}" # Return error message
244
 
 
 
 
 
 
 
 
 
 
 
 
 
 
245
 
246
- # Global instance
247
- audio_loop = None # Initialize the AudioLoop object
248
-
249
- async def transcribe_audio(text_input):
250
- """
251
- Transcribes audio using the AudioLoop class and returns the result.
252
- """
253
- global audio_loop
254
- if audio_loop is None:
255
- audio_loop = AudioLoop(video_mode="none") # Instantiate the class only once
256
- # You might want to handle the initialization differently based on your needs.
257
 
258
- loop = asyncio.get_event_loop()
259
 
260
- # if loop.is_running():
261
- # print("Async event loop already running. Using existing loop.")
262
- # task = loop.create_task(audio_loop.send_text(text_input))
263
- # return await task
264
- # else:
265
- # print("Starting new async event loop.")
266
- # return asyncio.run(audio_loop.send_text(text_input))
 
267
 
268
- if audio_loop.session is None:
269
  try:
270
- return await audio_loop.run()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271
  except Exception as e:
272
- print(f"Error in run(): {e}")
273
  traceback.print_exc()
274
- return f"Error: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
275
  else:
276
- try:
277
- await audio_loop.send_text(text_input)
278
- return await audio_loop.receive_audio() # Assuming receive_audio returns a string
279
- except Exception as e:
280
- print(f"Error after session is established: {e}")
281
- traceback.print_exc()
282
- return f"Error: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283
 
284
 
285
- # Gradio interface
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
286
  if __name__ == "__main__":
287
- iface = gr.Interface(
288
- fn=transcribe_audio,
289
- inputs=gr.Textbox(lines=2, placeholder="Enter text here..."),
290
- outputs="text",
291
- title="Gemini Live Connect Demo with Gradio",
292
- description="Enter text, and the model will replay same you said. This is a demo of the Gemini Live Connect API with Gradio.",
293
- )
294
- iface.launch()
 
 
 
 
 
1
+ # File: app.py
 
 
 
 
 
 
 
 
 
 
2
 
3
+ import gradio as gr
4
  import asyncio
5
  import base64
6
  import io
7
  import traceback
 
8
  import cv2
9
  import pyaudio
10
  import PIL.Image
11
  import mss
12
+ import google.generativeai as genai
13
+ from google.generativeai import types
14
+ import google.api_core.exceptions
15
+ import wave
16
+ import numpy as np
17
+ import threading
18
+ import queue
19
+ import os
20
+ import time
21
+ import tempfile
22
+ import atexit # For cleanup
23
+
24
+ # --- Constants ---
25
  FORMAT = pyaudio.paInt16
26
  CHANNELS = 1
27
  SEND_SAMPLE_RATE = 16000
28
+ RECEIVE_SAMPLE_RATE = 24000 # Gemini outputs at 24kHz
29
  CHUNK_SIZE = 1024
30
+ MODEL = "models/gemini-2.0-flash-exp" # Use the requested experimental model
31
+ DEFAULT_VIDEO_MODE = "none"
32
+ AVAILABLE_VOICES = ["Puck", "Charon", "Kore", "Fenrir", "Aoede"]
33
+ DEFAULT_VOICE = "Puck"
34
+ SYSTEM_INSTRUCTION_TEXT = "Answer user ask replay same thing user say no other word explain "
35
+
36
+ # --- Global State ---
37
+ audio_loop_instance = None
38
+ background_tasks = set()
39
+ background_loop = None # Event loop for the background thread
40
+ pya = None # Initialize PyAudio globally later
41
+ background_thread = None # Keep track of the thread
42
+ stop_background_loop = False # Flag to signal loop termination
43
+
44
+ # --- Original AudioLoop Class Methods (Included for potential future use) ---
45
+ # Note: We inherit from the original structure for clarity but override key methods
46
+ class OriginalAudioLoop:
47
+ """Base class structure placeholder - includes relevant methods from original script"""
48
+ def __init__(self, video_mode=DEFAULT_VIDEO_MODE):
 
 
 
 
 
 
 
 
 
 
 
 
49
  self.video_mode = video_mode
50
+ self.out_queue = None # Queue for data *to* Gemini (mic audio, images)
 
 
 
51
  self.session = None
52
+ self.audio_stream = None # Mic input stream
 
 
 
 
 
 
 
 
 
 
 
 
 
53
 
54
  def _get_frame(self, cap):
 
55
  ret, frame = cap.read()
56
+ if not ret: return None
 
 
 
 
 
57
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
58
+ img = PIL.Image.fromarray(frame_rgb)
59
  img.thumbnail([1024, 1024])
 
60
  image_io = io.BytesIO()
61
  img.save(image_io, format="jpeg")
62
  image_io.seek(0)
 
63
  mime_type = "image/jpeg"
64
  image_bytes = image_io.read()
65
  return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
66
 
67
  async def get_frames(self):
68
+ cap = None
69
+ try:
70
+ print("Attempting to open camera...")
71
+ cap = await asyncio.to_thread(cv2.VideoCapture, 0)
72
+ if not cap.isOpened():
73
+ print("Error: Could not open camera.")
74
+ # Signal error back to Gradio? For now, just log and exit task.
75
+ await run_coro_in_background_loop(update_status("Error: Could not open camera."))
76
+ return
77
+
78
+ print("Camera opened successfully.")
79
+ while True:
80
+ if not self.session: # Stop if disconnected
81
+ print("get_frames: Session closed, stopping camera task.")
82
+ break
83
+ # print("Reading frame from camera...")
84
+ frame = await asyncio.to_thread(self._get_frame, cap)
85
+ if frame is None:
86
+ # print("Warning: Failed to get frame from camera.")
87
+ await asyncio.sleep(0.1) # Avoid busy loop
88
+ continue # Skip putting None in queue
89
+
90
+ if self.out_queue:
91
+ # print("Putting camera frame in queue.")
92
+ await self.out_queue.put(frame)
93
+ await asyncio.sleep(1.0) # Send frame every second
94
 
95
+ except asyncio.CancelledError:
96
+ print("get_frames task cancelled.")
97
+ except Exception as e:
98
+ print(f"Error in get_frames: {e}")
99
+ await run_coro_in_background_loop(update_status(f"Camera Error: {e}"))
100
+ finally:
101
+ if cap and cap.isOpened():
102
+ print("Releasing camera.")
103
+ await asyncio.to_thread(cap.release)
104
+ print("Camera task finished.")
105
 
 
 
106
 
107
  def _get_screen(self):
108
+ try:
109
+ with mss.mss() as sct:
110
+ # Attempt to grab the primary monitor (often index 1 in mss.monitors)
111
+ monitor = sct.monitors[1]
112
+ sct_img = sct.grab(monitor)
113
+ img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") # Handle BGRA
114
+
115
+ image_io = io.BytesIO()
116
+ img.thumbnail([1024, 1024]) # Resize before saving
117
+ img.save(image_io, format="jpeg")
118
+ image_io.seek(0)
119
+
120
+ mime_type = "image/jpeg"
121
+ image_bytes = image_io.read()
122
+ return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
123
+ except IndexError:
124
+ print("Error capturing screen: Could not find monitor at index 1. Trying index 0.")
125
+ try: # Fallback to monitor 0 (usually includes all screens)
126
+ with mss.mss() as sct:
127
+ monitor = sct.monitors[0]
128
+ sct_img = sct.grab(monitor)
129
+ img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
130
+ image_io = io.BytesIO()
131
+ img.thumbnail([1024, 1024])
132
+ img.save(image_io, format="jpeg")
133
+ image_io.seek(0)
134
+ mime_type = "image/jpeg"
135
+ image_bytes = image_io.read()
136
+ return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
137
+ except Exception as e_fallback:
138
+ print(f"Error capturing screen (fallback monitor 0): {e_fallback}")
139
+ return None
140
+ except Exception as e:
141
+ print(f"Error capturing screen: {e}")
142
+ return None
143
 
144
  async def get_screen(self):
 
145
  while True:
146
+ if not self.session: # Stop if disconnected
147
+ print("get_screen: Session closed, stopping screen task.")
148
+ break
149
+ # print("Capturing screen...")
150
  frame = await asyncio.to_thread(self._get_screen)
151
  if frame is None:
152
+ print("Warning: Failed to capture screen.")
153
+ await asyncio.sleep(1.0) # Wait before retrying if error occurred
154
+ continue # Skip putting None in queue
155
 
156
+ if self.out_queue:
157
+ # print("Putting screen frame in queue.")
158
+ await self.out_queue.put(frame)
159
 
160
+ await asyncio.sleep(1.0) # Send screen frame every second
161
 
162
  async def send_realtime(self):
163
+ """Sends microphone audio or video frames from the out_queue to Gemini."""
164
  while True:
165
+ if not self.session or not self.out_queue:
166
+ # Wait if session/queue not ready or if disconnected
167
+ await asyncio.sleep(0.1)
168
+ if not self.session: # Check again after sleep if disconnected
169
+ print("send_realtime: Session closed, stopping task.")
170
+ break
171
+ continue
172
+
173
+ try:
174
+ msg = await asyncio.wait_for(self.out_queue.get(), timeout=1.0) # Wait with timeout
175
+ if self.session: # Check again in case session closed while waiting
176
+ # print(f"Sending {msg.get('mime_type', 'unknown type')} to Gemini...")
177
+ await self.session.send(input=msg)
178
+ self.out_queue.task_done()
179
+ except asyncio.TimeoutError:
180
+ # print("send_realtime: Queue empty, waiting...")
181
+ continue # No message in queue, loop again
182
+ except asyncio.CancelledError:
183
+ print("send_realtime task cancelled.")
184
+ break
185
+ except Exception as e:
186
+ print(f"Error in send_realtime: {e}")
187
+ await run_coro_in_background_loop(update_status(f"Send Error: {e}"))
188
+ # Avoid continuous errors if session is bad
189
+ if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)):
190
+ print("Connection error in send_realtime, pausing...")
191
+ await asyncio.sleep(5)
192
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
 
194
+ async def listen_audio(self):
195
+ """Listens to microphone and puts audio chunks onto the out_queue."""
196
+ global pya
197
+ if not pya:
198
+ print("Error: PyAudio not initialized in listen_audio.")
199
+ await run_coro_in_background_loop(update_status("Error: Audio system not ready."))
200
+ return
201
+
202
+ mic_info = None
203
+ stream = None
204
+ try:
205
+ print("Attempting to open microphone...")
206
+ mic_info = await asyncio.to_thread(pya.get_default_input_device_info)
207
+ stream = await asyncio.to_thread(
208
+ pya.open,
209
+ format=FORMAT,
210
+ channels=CHANNELS,
211
+ rate=SEND_SAMPLE_RATE,
212
+ input=True,
213
+ input_device_index=mic_info["index"],
214
+ frames_per_buffer=CHUNK_SIZE,
215
+ )
216
+ self.audio_stream = stream # Store reference for cleanup
217
+ print("Microphone stream opened.")
218
+ if __debug__:
219
+ kwargs = {"exception_on_overflow": False}
220
+ else:
221
+ kwargs = {}
222
+
223
+ while True:
224
+ if not self.session: # Stop if disconnected
225
+ print("listen_audio: Session closed, stopping microphone task.")
226
+ break
227
+ try:
228
+ # print("Reading from microphone...")
229
+ data = await asyncio.to_thread(stream.read, CHUNK_SIZE, **kwargs)
230
+ if self.out_queue:
231
+ # print("Putting microphone data in queue.")
232
+ await self.out_queue.put({"data": data, "mime_type": "audio/pcm"})
233
+ except IOError as e:
234
+ # This often happens if the buffer overflows or the stream is closed abruptly
235
+ # print(f"PyAudio read error (possible overflow or stream closed): {e}")
236
+ await asyncio.sleep(0.05) # Short pause before trying again
237
+ except asyncio.CancelledError:
238
+ print("listen_audio task cancelled.")
239
+ break
240
+
241
+ except OSError as e:
242
+ print(f"Error opening microphone: {e}. Is a microphone connected and accessible?")
243
+ await run_coro_in_background_loop(update_status(f"Mic Error: {e}"))
244
+ except Exception as e:
245
+ print(f"Error in listen_audio: {e}")
246
+ traceback.print_exc()
247
+ await run_coro_in_background_loop(update_status(f"Mic Error: {e}"))
248
+ finally:
249
+ if stream:
250
+ print("Stopping and closing microphone stream.")
251
+ await asyncio.to_thread(stream.stop_stream)
252
+ await asyncio.to_thread(stream.close)
253
+ self.audio_stream = None # Clear reference
254
+ print("Microphone stream closed.")
255
+
256
+
257
+ # --- Gradio Specific Audio Loop ---
258
+ class GradioAudioLoop(OriginalAudioLoop): # Inherit and modify/add methods
259
+ def __init__(self, video_mode=DEFAULT_VIDEO_MODE, api_key=None, voice_name=DEFAULT_VOICE):
260
+ super().__init__(video_mode)
261
+ self.api_key = api_key
262
+ self.voice_name = voice_name
263
+ self.client = None
264
+ self.config = None
265
+ self.connection_status = "Disconnected" # Internal status
266
+
267
+ # Queues for communication between Gradio handler and background loop
268
+ self.text_input_queue = asyncio.Queue()
269
+ self.response_text_queue = asyncio.Queue()
270
+ self.response_audio_queue = asyncio.Queue()
271
+ self.response_event = asyncio.Event() # Signal when response is ready
272
+
273
+ # Buffers for accumulating response data within a turn
274
+ self.current_audio_buffer = io.BytesIO()
275
+ self.current_text_response = ""
276
+
277
+ def _initialize_client_and_config(self):
278
+ """Initialize Gemini client and configuration."""
279
+ if not self.api_key:
280
+ raise ValueError("API key is not set.")
281
+ try:
282
+ # Use v1beta for experimental models if needed, adjust if stable
283
+ # http_options={"api_version": "v1beta"} # Try if v1alpha causes issues
284
+ # Check if GEMINI_API_KEY env var exists, otherwise use provided key
285
+ api_key_to_use = os.getenv("GEMINI_API_KEY", self.api_key)
286
+ if not api_key_to_use:
287
+ raise ValueError("No API key provided or found in GEMINI_API_KEY environment variable.")
288
+
289
+ # Use Client instead of genai.configure if passing key directly
290
+ print("Initializing Gemini Client...")
291
+ self.client = genai.Client(api_key=api_key_to_use)
292
+
293
+ print(f"Setting up LiveConnectConfig with voice: {self.voice_name}")
294
+ self.config = types.LiveConnectConfig(
295
+ response_modalities=["audio", "text"], # Get both audio and text
296
+ speech_config=types.SpeechConfig(
297
+ voice_config=types.VoiceConfig(
298
+ prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=self.voice_name) # Use selected voice
299
+ )
300
+ ),
301
+ system_instruction=types.Content(
302
+ parts=[types.Part.from_text(text=SYSTEM_INSTRUCTION_TEXT)],
303
+ role="user" # System instructions often role='user'
304
+ ),
305
+ )
306
+ print("Gemini client and config initialized successfully.")
307
+ self.connection_status = "Initialized"
308
+ return True # Indicate success
309
+ except Exception as e:
310
+ print(f"Error initializing Gemini client: {e}")
311
+ self.client = None
312
+ self.config = None
313
+ self.connection_status = f"Initialization Error: {e}"
314
+ return False # Indicate failure
315
+
316
+
317
+ async def process_text_inputs(self):
318
+ """ Task to wait for text input from Gradio and send it to Gemini """
319
+ while True:
320
+ try:
321
+ # Wait indefinitely for an item from the queue
322
+ text_to_send = await self.text_input_queue.get()
323
+
324
+ if text_to_send is None: # Use None as a signal to stop
325
+ print("Stopping text input processing.")
326
+ break
327
+
328
+ if self.session and self.connection_status == "Connected":
329
+ print(f"Sending text to Gemini: {text_to_send[:50]}...")
330
+ # Reset response holders before sending new message
331
+ self.current_audio_buffer = io.BytesIO()
332
+ self.current_text_response = ""
333
+ self.response_event.clear()
334
+ # Send text and indicate end of turn
335
+ await self.session.send(input=text_to_send or ".", end_of_turn=True)
336
+ print("Text sent, waiting for response...")
337
+ else:
338
+ print(f"Warning: Cannot send text. Session not active or status is {self.connection_status}.")
339
+ # Signal back an error to the waiting Gradio handler
340
+ await self.response_text_queue.put(f"Error: Not connected or connection issue ({self.connection_status}). Cannot send message.")
341
+ await self.response_audio_queue.put(b"") # Empty audio
342
+ self.response_event.set() # Unblock the handler
343
+
344
+ self.text_input_queue.task_done() # Mark task as done
345
+
346
+ except asyncio.CancelledError:
347
+ print("process_text_inputs task cancelled.")
348
+ break
349
+ except Exception as e:
350
+ print(f"Error in process_text_inputs: {e}")
351
+ # Signal error back to the waiting Gradio handler
352
+ await self.response_text_queue.put(f"Error sending message: {e}")
353
+ await self.response_audio_queue.put(b"")
354
+ self.response_event.set()
355
+ # Avoid loop BSoD on continuous errors
356
+ await asyncio.sleep(1)
357
+
358
+
359
+ async def receive_responses(self):
360
+ """ Task to receive responses (audio/text) from Gemini """
361
  while True:
362
+ if not self.session or self.connection_status != "Connected":
363
+ # print("receive_responses: Session not ready or not connected, waiting...")
364
+ await asyncio.sleep(0.2)
365
+ if not self.session: # Check if disconnected while waiting
366
+ print("receive_responses: Session closed, stopping task.")
367
+ break
368
+ continue
369
+
370
+ try:
371
+ # print("Waiting for Gemini turn...")
372
+ turn = self.session.receive() # This blocks until a turn starts
373
+ # print("Gemini turn started.")
374
+ async for response in turn:
375
+ if data := response.data:
376
+ # print(f"Received audio chunk: {len(data)} bytes")
377
+ self.current_audio_buffer.write(data)
378
+ if text := response.text:
379
+ # print(f"Received text chunk: {text}")
380
+ self.current_text_response += text
381
+
382
+ # Turn complete - put results onto response queues and signal Gradio handler
383
+ # print("Gemini turn complete.")
384
+ audio_data = self.current_audio_buffer.getvalue()
385
+ # print(f"Total audio received: {len(audio_data)} bytes")
386
+ # print(f"Total text received: {self.current_text_response}")
387
+
388
+ await self.response_audio_queue.put(audio_data)
389
+ await self.response_text_queue.put(self.current_text_response)
390
+ self.response_event.set() # Signal that response is ready for the Gradio handler
391
+
392
+ except asyncio.CancelledError:
393
+ print("receive_responses task cancelled.")
394
+ break
395
+ except google.api_core.exceptions.Cancelled:
396
+ print("Gemini receive cancelled (likely due to interruption or end)")
397
+ # Signal completion even if cancelled externally
398
+ await self.response_audio_queue.put(self.current_audio_buffer.getvalue())
399
+ await self.response_text_queue.put(self.current_text_response + " [Receive Cancelled]")
400
+ self.response_event.set()
401
+ except Exception as e:
402
+ print(f"Error receiving responses: {e}")
403
+ traceback.print_exc()
404
+ # Signal completion with error to unblock handler
405
+ await self.response_audio_queue.put(b"") # Empty audio
406
+ await self.response_text_queue.put(f"Error receiving response: {e}")
407
+ self.response_event.set()
408
+ # Pause on significant errors to avoid spamming logs
409
+ if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)):
410
+ print("Connection error in receive_responses, pausing...")
411
+ self.connection_status = f"Receive Error: {e}" # Update status
412
+ await asyncio.sleep(5)
413
+
414
+
415
+ async def send_message_and_wait_for_response(self, text):
416
+ """ Puts text on input queue and waits for the response event """
417
+ if not self.session or self.connection_status != "Connected":
418
+ return f"Error: Not connected ({self.connection_status}).", None
419
+
420
+ await self.text_input_queue.put(text)
421
+ print("Waiting for response event...")
422
  try:
423
+ # Wait for the event with a timeout
424
+ await asyncio.wait_for(self.response_event.wait(), timeout=60.0) # 60 second timeout
425
+ print("Response event received.")
426
+ except asyncio.TimeoutError:
427
+ print("Timeout waiting for Gemini response.")
428
+ return "Error: Timeout waiting for response.", None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
429
  except Exception as e:
430
+ print(f"Error waiting for response event: {e}")
431
+ return f"Error waiting for response: {e}", None
432
 
433
+ # Retrieve results from response queues
434
+ # Use get_nowait as the event guarantees items are present
435
+ try:
436
+ audio_data = self.response_audio_queue.get_nowait()
437
+ text_response = self.response_text_queue.get_nowait()
438
+ self.response_audio_queue.task_done()
439
+ self.response_text_queue.task_done()
440
+ except asyncio.QueueEmpty:
441
+ print("Error: Response queues were empty after event was set.")
442
+ return "Internal Error: Response queues empty.", None
443
+ except Exception as e:
444
+ print(f"Error retrieving from response queues: {e}")
445
+ return f"Internal Error: {e}", None
446
 
 
 
 
 
 
 
 
 
 
 
 
447
 
448
+ return text_response, audio_data
449
 
450
+ async def run_main_loop(self):
451
+ """ The main async method to establish connection and manage tasks """
452
+ global background_tasks
453
+ if not self._initialize_client_and_config():
454
+ print("Initialization failed, cannot connect.")
455
+ self.connection_status = "Connection Failed: Initialization error."
456
+ await run_coro_in_background_loop(update_status(self.connection_status))
457
+ return # Stop if client setup failed
458
 
 
459
  try:
460
+ print(f"Attempting to connect to Gemini model: {MODEL}...")
461
+ self.connection_status = "Connecting..."
462
+ await run_coro_in_background_loop(update_status(self.connection_status))
463
+
464
+ # --- Connect to Gemini ---
465
+ # Use a timeout for the connection attempt itself
466
+ try:
467
+ # The actual connection happens within the context manager entry
468
+ async with asyncio.wait_for(
469
+ self.client.aio.live.connect(model=MODEL, config=self.config),
470
+ timeout=30.0 # 30 second timeout for connection
471
+ ) as session:
472
+ self.session = session
473
+ self.connection_status = "Connected"
474
+ print("Session established successfully.")
475
+ await run_coro_in_background_loop(update_status(self.connection_status))
476
+
477
+ # Queue for mic/video data TO Gemini
478
+ self.out_queue = asyncio.Queue(maxsize=20)
479
+
480
+ # --- Create and manage background tasks ---
481
+ tasks = set()
482
+ tasks.add(asyncio.create_task(self.process_text_inputs(), name="process_text_inputs"))
483
+ tasks.add(asyncio.create_task(self.receive_responses(), name="receive_responses"))
484
+
485
+ if self.video_mode != "none":
486
+ tasks.add(asyncio.create_task(self.send_realtime(), name="send_realtime"))
487
+ if self.video_mode == "camera":
488
+ print("Starting camera input task...")
489
+ tasks.add(asyncio.create_task(self.get_frames(), name="get_frames"))
490
+ elif self.video_mode == "screen":
491
+ print("Starting screen capture task...")
492
+ tasks.add(asyncio.create_task(self.get_screen(), name="get_screen"))
493
+ # Option to add microphone input alongside video if needed
494
+ # print("Starting microphone input task...")
495
+ # tasks.add(asyncio.create_task(self.listen_audio(), name="listen_audio"))
496
+
497
+ background_tasks.update(tasks)
498
+
499
+ # Keep running while connected and tasks are active
500
+ # We primarily rely on receive_responses to detect session closure/errors
501
+ while self.connection_status == "Connected" and self.session:
502
+ await asyncio.sleep(0.5) # Check status periodically
503
+
504
+ print("Exiting main run loop (disconnected or error).")
505
+
506
+ except asyncio.TimeoutError:
507
+ print("CONNECTION FAILED: Timeout while trying to connect.")
508
+ self.connection_status = "Connection Failed: Timeout"
509
+ await run_coro_in_background_loop(update_status(self.connection_status))
510
+ except google.api_core.exceptions.PermissionDenied as e:
511
+ print(f"CONNECTION FAILED: Permission Denied. Check API key and permissions. {e}")
512
+ self.connection_status = "Connection Failed: Permission Denied"
513
+ await run_coro_in_background_loop(update_status(f"{self.connection_status}. Check API Key."))
514
+ except google.api_core.exceptions.InvalidArgument as e:
515
+ print(f"CONNECTION FAILED: Invalid Argument. Check model name ('{MODEL}') and config. {e}")
516
+ self.connection_status = f"Connection Failed: Invalid Argument (Model/Config?)"
517
+ await run_coro_in_background_loop(update_status(f"{self.connection_status} Details: {e}"))
518
+ except Exception as e: # Catch other potential connection errors
519
+ print(f"CONNECTION FAILED: An unexpected error occurred during connection. {e}")
520
+ traceback.print_exc()
521
+ self.connection_status = f"Connection Failed: {e}"
522
+ await run_coro_in_background_loop(update_status(self.connection_status))
523
+
524
+ except asyncio.CancelledError:
525
+ print("run_main_loop task cancelled.")
526
+ self.connection_status = "Disconnected (Cancelled)"
527
  except Exception as e:
528
+ print(f"Error in AudioLoop run_main_loop: {e}")
529
  traceback.print_exc()
530
+ self.connection_status = f"Runtime Error: {e}"
531
+ await run_coro_in_background_loop(update_status(self.connection_status))
532
+ finally:
533
+ print("Cleaning up audio loop resources...")
534
+ final_status = self.connection_status # Capture status before changing
535
+ if final_status == "Connected": # If loop exited cleanly but was connected
536
+ final_status = "Disconnected"
537
+ self.connection_status = "Disconnected" # Ensure status is updated
538
+
539
+ # Cancel remaining background tasks associated with *this* instance
540
+ tasks_to_cancel = list(background_tasks) # Iterate over a copy
541
+ background_tasks.clear() # Clear global set for this instance
542
+ for task in tasks_to_cancel:
543
+ if task and not task.done():
544
+ task.cancel()
545
+ print(f"Cancelled task: {task.get_name()}")
546
+
547
+ # Close PyAudio stream if open (managed by listen_audio task's finally now)
548
+ # if self.audio_stream and not self.audio_stream.is_stopped(): ... handled in listen_audio
549
+
550
+ # Reset session and client
551
+ self.session = None # Important to signal disconnection
552
+ self.client = None
553
+ self.out_queue = None # Clear queue reference
554
+
555
+ print("AudioLoop run finished.")
556
+ await run_coro_in_background_loop(update_status(final_status)) # Update Gradio status
557
+
558
+ async def disconnect(self):
559
+ """Initiates the disconnection process."""
560
+ print("Disconnect requested.")
561
+ if self.session:
562
+ # The run_main_loop should detect the session closing or errors.
563
+ # Explicitly closing the session might be possible depending on the SDK,
564
+ # but often letting the context manager exit is the intended way.
565
+ # For now, just update status and let the main loop handle cleanup.
566
+ print("Setting status to Disconnecting...")
567
+ self.connection_status = "Disconnecting"
568
+ # Signal tasks relying on session to stop
569
+ self.session = None # This should help loops terminate
570
+ # Put None on input queue to stop text processor if waiting
571
+ await self.text_input_queue.put(None)
572
+ else:
573
+ self.connection_status = "Disconnected"
574
+
575
+ # Cleanup might happen in run_main_loop's finally block
576
+ await run_coro_in_background_loop(update_status("Disconnected"))
577
+
578
+
579
+ # --- Helper Functions ---
580
+
581
+ def start_asyncio_loop():
582
+ """Starts the asyncio event loop in a separate thread."""
583
+ global background_loop, stop_background_loop
584
+ stop_background_loop = False
585
+ background_loop = asyncio.new_event_loop()
586
+ asyncio.set_event_loop(background_loop)
587
+ print("Background asyncio loop starting.")
588
+ try:
589
+ # Run until explicitly stopped
590
+ while not stop_background_loop:
591
+ background_loop.call_later(0.1, background_loop.stop) # Wake up periodically
592
+ background_loop.run_forever()
593
+ if stop_background_loop:
594
+ print("Stop signal received, exiting run_forever loop.")
595
+ break # Exit outer loop if stopped
596
+
597
+ # Run pending tasks before closing
598
+ print("Running pending tasks before closing loop...")
599
+ pending = asyncio.all_tasks(loop=background_loop)
600
+ if pending:
601
+ background_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
602
+
603
+ except Exception as e:
604
+ print(f"Error in background loop: {e}")
605
+ traceback.print_exc()
606
+ finally:
607
+ if background_loop.is_running():
608
+ background_loop.stop()
609
+ print("Closing background loop...")
610
+ # Give tasks a moment to finish cancelling
611
+ time.sleep(0.5)
612
+ background_loop.close()
613
+ print("Background asyncio loop stopped.")
614
+ background_loop = None # Clear global ref
615
+
616
+ def stop_asyncio_loop():
617
+ """Signals the background asyncio loop to stop."""
618
+ global stop_background_loop, background_loop
619
+ print("Signalling background loop to stop...")
620
+ stop_background_loop = True
621
+ if background_loop and background_loop.is_running():
622
+ # This helps wake up the loop if it's idle
623
+ background_loop.call_soon_threadsafe(background_loop.stop)
624
+
625
+ async def run_coro_in_background_loop(coro):
626
+ """Submits a coroutine to the background event loop and returns its future."""
627
+ global background_loop
628
+ if background_loop and background_loop.is_running() and not stop_background_loop:
629
+ try:
630
+ # Use run_coroutine_threadsafe for thread safety
631
+ future = asyncio.run_coroutine_threadsafe(coro, background_loop)
632
+ return future # Return the concurrent.futures.Future
633
+ except RuntimeError as e:
634
+ # Handle cases where the loop might be shutting down
635
+ print(f"Error submitting coroutine (loop shutting down?): {e}")
636
+ future = asyncio.Future()
637
+ future.set_exception(e)
638
+ return None # Indicate failure to schedule
639
+ except Exception as e:
640
+ print(f"Unexpected error submitting coroutine: {e}")
641
+ future = asyncio.Future()
642
+ future.set_exception(e)
643
+ return None
644
+ else:
645
+ print("Error: Background asyncio loop not running or stopping.")
646
+ # Create a dummy future that resolves immediately with an error?
647
+ # Or just return None to indicate failure
648
+ return None
649
+
650
+ def format_audio_for_gradio(pcm_data):
651
+ """Converts raw PCM data to a format Gradio's Audio component can use."""
652
+ if not pcm_data:
653
+ # print("No audio data received to format.")
654
+ return None
655
+ try:
656
+ # Ensure PyAudio is initialized to get sample width
657
+ if not pya: initialize_py_audio()
658
+ if not pya: return None # Could not initialize
659
+
660
+ # Create a WAV file in memory
661
+ wav_buffer = io.BytesIO()
662
+ with wave.open(wav_buffer, 'wb') as wf:
663
+ wf.setnchannels(CHANNELS)
664
+ wf.setsampwidth(pya.get_sample_size(FORMAT)) # Should be 2 for paInt16
665
+ wf.setframerate(RECEIVE_SAMPLE_RATE)
666
+ wf.writeframes(pcm_data)
667
+ wav_buffer.seek(0)
668
+
669
+ # Read the WAV data back and convert to numpy array
670
+ with wave.open(wav_buffer, 'rb') as wf_read:
671
+ n_frames = wf_read.getnframes()
672
+ data = wf_read.readframes(n_frames)
673
+ dtype = np.int16 # Based on pyaudio.paInt16
674
+ numpy_array = np.frombuffer(data, dtype=dtype)
675
+
676
+ # print(f"Formatted audio: {len(numpy_array)} samples, rate {RECEIVE_SAMPLE_RATE}")
677
+ # Return tuple for Gradio Audio: (sample_rate, numpy_array)
678
+ return (RECEIVE_SAMPLE_RATE, numpy_array)
679
+ except Exception as e:
680
+ print(f"Error formatting audio: {e}")
681
+ traceback.print_exc()
682
+ return None
683
+
684
+ def initialize_py_audio():
685
+ global pya
686
+ if pya is None:
687
+ try:
688
+ print("Initializing PyAudio...")
689
+ pya = pyaudio.PyAudio()
690
+ print("PyAudio initialized.")
691
+ return True
692
+ except Exception as e:
693
+ print(f"Failed to initialize PyAudio: {e}")
694
+ pya = None
695
+ return False
696
+ return True # Already initialized
697
+
698
+ def terminate_py_audio():
699
+ global pya
700
+ if pya:
701
+ print("Terminating PyAudio...")
702
+ try:
703
+ pya.terminate()
704
+ except Exception as e:
705
+ print(f"Error terminating PyAudio: {e}")
706
+ finally:
707
+ pya = None
708
+ print("PyAudio terminated.")
709
+
710
+
711
+ # --- Gradio Interface and Handlers ---
712
+
713
+ # Placeholder for status updates - needs to run in the background loop
714
+ async def update_status(new_status: str):
715
+ """Coroutine to update the Gradio status component."""
716
+ # This function itself doesn't directly update Gradio.
717
+ # It relies on being scheduled and the Gradio handler returning the value.
718
+ # However, for internal logging, we print here.
719
+ print(f"Status Update (async): {new_status}")
720
+ # The actual update happens when the calling handler returns this status
721
+ # For direct async updates, you'd need Gradio's streaming features if applicable.
722
+
723
+
724
+ def handle_connect(api_key, voice_name, video_mode):
725
+ """Handles the 'Connect' button click."""
726
+ global audio_loop_instance, background_loop, background_thread
727
+ print("\n--- Connect Button Clicked ---")
728
+ status = "Connecting..."
729
+ yield status, None, None # Initial status update
730
+
731
+ if not api_key:
732
+ yield "Error: Please enter a Gemini API key.", None, None
733
+ return
734
+ if audio_loop_instance and audio_loop_instance.connection_status not in ["Disconnected", "Initialization Error", "Connection Failed: Timeout", "Connection Failed: Permission Denied", "Connection Failed: Invalid Argument (Model/Config?)"]:
735
+ yield f"Already connected or connecting ({audio_loop_instance.connection_status}). Disconnect first.", None, None
736
+ return
737
+
738
+ # Start background loop thread if not running
739
+ if not background_thread or not background_thread.is_alive():
740
+ print("Starting background thread...")
741
+ background_thread = threading.Thread(target=start_asyncio_loop, daemon=True)
742
+ background_thread.start()
743
+ time.sleep(0.5) # Give the loop a moment to start
744
+
745
+ # Ensure PyAudio is initialized
746
+ if not initialize_py_audio():
747
+ yield "Error: Failed to initialize audio system.", None, None
748
+ return
749
+
750
+ print(f"Attempting to connect with voice: {voice_name}, video: {video_mode}")
751
+ audio_loop_instance = GradioAudioLoop(video_mode=video_mode, api_key=api_key, voice_name=voice_name)
752
+
753
+ # Run the audio loop's main logic in the background asyncio loop
754
+ connect_future = run_coro_in_background_loop(audio_loop_instance.run_main_loop())
755
+ if not connect_future:
756
+ audio_loop_instance = None # Cleanup if scheduling failed
757
+ yield "Error: Failed to schedule connection task.", None, None
758
+ return
759
+
760
+ # Don't block Gradio here. The run_main_loop will update status via update_status coroutine calls.
761
+ # We yield the initial "Connecting..." status. Subsequent updates handled async.
762
+ # We might need a short sleep/check or rely purely on async updates. Let's rely on async updates.
763
+ # yield "Connecting... Waiting for confirmation.", None, None
764
+ # Add a small delay to allow the initial connection steps to run and update status
765
+ await asyncio.sleep(1) # Use await if in async context, time.sleep otherwise? Gradio handler might be sync.
766
+ # Use time.sleep in sync Gradio handler context
767
+ time.sleep(1.5)
768
+
769
+ # The final status will be updated by the run_main_loop's finally block or error handling
770
+ # Check the instance status directly after a short wait
771
+ if audio_loop_instance:
772
+ current_status = audio_loop_instance.connection_status
773
+ yield current_status, None, None
774
  else:
775
+ # This case shouldn't happen if scheduling worked, but as a fallback
776
+ yield "Error: Connection process failed unexpectedly.", None, None
777
+
778
+
779
+ def handle_disconnect():
780
+ """Handles the 'Disconnect' button click."""
781
+ global audio_loop_instance
782
+ print("\n--- Disconnect Button Clicked ---")
783
+ status = "Disconnecting..."
784
+ yield status, None, None # Initial status update
785
+
786
+ if not audio_loop_instance or audio_loop_instance.connection_status == "Disconnected":
787
+ yield "Already disconnected.", None, None
788
+ return
789
+
790
+ # Schedule the disconnect coroutine
791
+ disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect())
792
+ if not disconnect_future:
793
+ yield "Error: Failed to schedule disconnection task.", None, None
794
+ return
795
+
796
+ try:
797
+ # Wait briefly for disconnect to initiate
798
+ disconnect_future.result(timeout=5.0)
799
+ status = "Disconnected"
800
+ except TimeoutError:
801
+ status = "Disconnect timeout. Check logs."
802
+ print("Timeout waiting for disconnect confirmation.")
803
+ except Exception as e:
804
+ status = f"Error during disconnect: {e}"
805
+ print(f"Error during disconnect future result: {e}")
806
+
807
+ # Clean up global instance
808
+ audio_loop_instance = None
809
+
810
+ # Optionally terminate PyAudio here or let atexit handle it
811
+ # terminate_py_audio() # Can cause issues if connect is clicked again quickly
812
+
813
+ yield status, None, None # Final status update
814
+
815
+
816
+ def handle_send_message(message):
817
+ """Handles sending a text message."""
818
+ global audio_loop_instance
819
+ print(f"\n--- Sending Message: {message[:30]}... ---")
820
+
821
+ if not audio_loop_instance or audio_loop_instance.connection_status != "Connected":
822
+ yield "Error: Not connected. Cannot send message.", None # Update status text, no audio
823
+ return
824
+
825
+ if not message or message.strip() == "":
826
+ yield "Cannot send empty message.", None
827
+ return
828
+
829
+ # Clear previous outputs
830
+ yield "Sending message...", None # Update status, clear audio
831
+
832
+ # Schedule the send/receive task and wait for its result
833
+ response_future = run_coro_in_background_loop(
834
+ audio_loop_instance.send_message_and_wait_for_response(message)
835
+ )
836
+
837
+ if not response_future:
838
+ yield "Error: Failed to schedule message task.", None
839
+ return
840
+
841
+ text_response = "Error: No response received."
842
+ audio_output = None
843
+ try:
844
+ # Wait for the background task to complete and return results
845
+ # Adjust timeout as needed
846
+ result_text, result_audio_data = response_future.result(timeout=60.0) # Wait up to 60 secs
847
+
848
+ text_response = result_text
849
+ if result_audio_data:
850
+ print(f"Received audio data ({len(result_audio_data)} bytes), formatting...")
851
+ audio_output = format_audio_for_gradio(result_audio_data)
852
+ if audio_output is None:
853
+ print("Failed to format audio for Gradio.")
854
+ text_response += " [Audio Formatting Error]"
855
+ else:
856
+ print("No audio data received in response.")
857
+ text_response += " [No Audio Received]"
858
+
859
+
860
+ except TimeoutError:
861
+ print("Timeout waiting for response future.")
862
+ text_response = "Error: Timeout waiting for Gemini response."
863
+ # Optionally try to cancel the future if possible/needed
864
+ except Exception as e:
865
+ print(f"Error getting result from response future: {e}")
866
+ traceback.print_exc()
867
+ text_response = f"Error processing response: {e}"
868
+
869
+ print(f"Final Text Response: {text_response}")
870
+ print(f"Final Audio Output: {'Present' if audio_output else 'None'}")
871
+ yield text_response, audio_output
872
+
873
+
874
+ # --- Gradio Interface Definition ---
875
+ with gr.Blocks(theme=gr.themes.Soft()) as demo:
876
+ gr.Markdown("# Gemini LiveConnect TTS Interface")
877
+ gr.Markdown(f"Using Model: `{MODEL}`")
878
+
879
+ with gr.Row():
880
+ api_key_input = gr.Textbox(label="Gemini API Key", type="password", placeholder="Enter your API key")
881
+ voice_select = gr.Dropdown(label="Select Voice", choices=AVAILABLE_VOICES, value=DEFAULT_VOICE)
882
+ video_mode_select = gr.Radio(label="Video Input (Optional)", choices=["none", "camera", "screen"], value=DEFAULT_VIDEO_MODE, visible=False) # Hidden for now, focus on TTS
883
+
884
+ with gr.Row():
885
+ connect_button = gr.Button("Connect")
886
+ disconnect_button = gr.Button("Disconnect")
887
+
888
+ status_output = gr.Textbox(label="Status", value="Disconnected", interactive=False)
889
+
890
+ with gr.Column():
891
+ message_input = gr.Textbox(label="Your Message", placeholder="Type your message here...")
892
+ send_button = gr.Button("Send Message")
893
+
894
+ with gr.Column():
895
+ gr.Markdown("## Response")
896
+ response_text_output = gr.Textbox(label="Gemini Text", interactive=False)
897
+ audio_output = gr.Audio(label="Gemini Audio", type="numpy", interactive=False) # Use numpy for (rate, data) tuple
898
+
899
+ # --- Event Handlers ---
900
+ connect_button.click(
901
+ fn=handle_connect,
902
+ inputs=[api_key_input, voice_select, video_mode_select],
903
+ outputs=[status_output, response_text_output, audio_output] # Clear outputs on connect
904
+ )
905
+ disconnect_button.click(
906
+ fn=handle_disconnect,
907
+ inputs=[],
908
+ outputs=[status_output, response_text_output, audio_output] # Clear outputs on disconnect
909
+ )
910
+ send_button.click(
911
+ fn=handle_send_message,
912
+ inputs=[message_input],
913
+ outputs=[response_text_output, audio_output]
914
+ )
915
+ # Allow sending message by pressing Enter in the textbox
916
+ message_input.submit(
917
+ fn=handle_send_message,
918
+ inputs=[message_input],
919
+ outputs=[response_text_output, audio_output]
920
+ )
921
 
922
 
923
+ # --- Cleanup Function ---
924
+ def cleanup():
925
+ print("Running cleanup...")
926
+ global audio_loop_instance
927
+
928
+ # Disconnect if connected
929
+ if audio_loop_instance and audio_loop_instance.connection_status != "Disconnected":
930
+ print("Disconnecting during cleanup...")
931
+ disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect())
932
+ if disconnect_future:
933
+ try:
934
+ disconnect_future.result(timeout=5.0)
935
+ print("Disconnect successful during cleanup.")
936
+ except Exception as e:
937
+ print(f"Error during cleanup disconnect: {e}")
938
+ audio_loop_instance = None
939
+
940
+ # Signal background loop to stop
941
+ stop_asyncio_loop()
942
+
943
+ # Wait for background thread to finish
944
+ if background_thread and background_thread.is_alive():
945
+ print("Waiting for background thread to join...")
946
+ background_thread.join(timeout=5.0)
947
+ if background_thread.is_alive():
948
+ print("Warning: Background thread did not exit cleanly.")
949
+
950
+ # Terminate PyAudio
951
+ terminate_py_audio()
952
+ print("Cleanup finished.")
953
+
954
+ # Register cleanup function to run on exit
955
+ atexit.register(cleanup)
956
+
957
+ # --- Main Execution ---
958
  if __name__ == "__main__":
959
+ # Start the background thread immediately (optional, connect can start it too)
960
+ # print("Starting background thread on launch...")
961
+ # background_thread = threading.Thread(target=start_asyncio_loop, daemon=True)
962
+ # background_thread.start()
963
+
964
+ print("Launching Gradio Interface...")
965
+ # Share=True to create a public link (remove if not needed)
966
+ demo.queue().launch(share=False)
967
+
968
+ # Keep main thread alive while Gradio is running (Gradio launch blocks)
969
+ print("Gradio Interface closed.")
970
+ # Cleanup is handled by atexit