Athspi commited on
Commit
b8a34b4
·
verified ·
1 Parent(s): 843feb6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +50 -960
app.py CHANGED
@@ -1,970 +1,60 @@
1
- # File: app.py
2
-
3
- import gradio as gr
4
- import asyncio
5
- import base64
6
- import io
7
- import traceback
8
- import cv2
9
- import pyaudio
10
- import PIL.Image
11
- import mss
12
- import google.generativeai as genai
13
- from google.generativeai import types
14
- import google.api_core.exceptions
15
- import wave
16
- import numpy as np
17
- import threading
18
- import queue
19
  import os
20
- import time
21
- import tempfile
22
- import atexit # For cleanup
23
-
24
- # --- Constants ---
25
- FORMAT = pyaudio.paInt16
26
- CHANNELS = 1
27
- SEND_SAMPLE_RATE = 16000
28
- RECEIVE_SAMPLE_RATE = 24000 # Gemini outputs at 24kHz
29
- CHUNK_SIZE = 1024
30
- MODEL = "models/gemini-2.0-flash-exp" # Use the requested experimental model
31
- DEFAULT_VIDEO_MODE = "none"
32
- AVAILABLE_VOICES = ["Puck", "Charon", "Kore", "Fenrir", "Aoede"]
33
- DEFAULT_VOICE = "Puck"
34
- SYSTEM_INSTRUCTION_TEXT = "Answer user ask replay same thing user say no other word explain "
35
-
36
- # --- Global State ---
37
- audio_loop_instance = None
38
- background_tasks = set()
39
- background_loop = None # Event loop for the background thread
40
- pya = None # Initialize PyAudio globally later
41
- background_thread = None # Keep track of the thread
42
- stop_background_loop = False # Flag to signal loop termination
43
-
44
- # --- Original AudioLoop Class Methods (Included for potential future use) ---
45
- # Note: We inherit from the original structure for clarity but override key methods
46
- class OriginalAudioLoop:
47
- """Base class structure placeholder - includes relevant methods from original script"""
48
- def __init__(self, video_mode=DEFAULT_VIDEO_MODE):
49
- self.video_mode = video_mode
50
- self.out_queue = None # Queue for data *to* Gemini (mic audio, images)
51
- self.session = None
52
- self.audio_stream = None # Mic input stream
53
-
54
- def _get_frame(self, cap):
55
- ret, frame = cap.read()
56
- if not ret: return None
57
- frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
58
- img = PIL.Image.fromarray(frame_rgb)
59
- img.thumbnail([1024, 1024])
60
- image_io = io.BytesIO()
61
- img.save(image_io, format="jpeg")
62
- image_io.seek(0)
63
- mime_type = "image/jpeg"
64
- image_bytes = image_io.read()
65
- return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
66
-
67
- async def get_frames(self):
68
- cap = None
69
- try:
70
- print("Attempting to open camera...")
71
- cap = await asyncio.to_thread(cv2.VideoCapture, 0)
72
- if not cap.isOpened():
73
- print("Error: Could not open camera.")
74
- # Signal error back to Gradio? For now, just log and exit task.
75
- await run_coro_in_background_loop(update_status("Error: Could not open camera."))
76
- return
77
-
78
- print("Camera opened successfully.")
79
- while True:
80
- if not self.session: # Stop if disconnected
81
- print("get_frames: Session closed, stopping camera task.")
82
- break
83
- # print("Reading frame from camera...")
84
- frame = await asyncio.to_thread(self._get_frame, cap)
85
- if frame is None:
86
- # print("Warning: Failed to get frame from camera.")
87
- await asyncio.sleep(0.1) # Avoid busy loop
88
- continue # Skip putting None in queue
89
-
90
- if self.out_queue:
91
- # print("Putting camera frame in queue.")
92
- await self.out_queue.put(frame)
93
- await asyncio.sleep(1.0) # Send frame every second
94
-
95
- except asyncio.CancelledError:
96
- print("get_frames task cancelled.")
97
- except Exception as e:
98
- print(f"Error in get_frames: {e}")
99
- await run_coro_in_background_loop(update_status(f"Camera Error: {e}"))
100
- finally:
101
- if cap and cap.isOpened():
102
- print("Releasing camera.")
103
- await asyncio.to_thread(cap.release)
104
- print("Camera task finished.")
105
-
106
-
107
- def _get_screen(self):
108
- try:
109
- with mss.mss() as sct:
110
- # Attempt to grab the primary monitor (often index 1 in mss.monitors)
111
- monitor = sct.monitors[1]
112
- sct_img = sct.grab(monitor)
113
- img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") # Handle BGRA
114
-
115
- image_io = io.BytesIO()
116
- img.thumbnail([1024, 1024]) # Resize before saving
117
- img.save(image_io, format="jpeg")
118
- image_io.seek(0)
119
-
120
- mime_type = "image/jpeg"
121
- image_bytes = image_io.read()
122
- return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
123
- except IndexError:
124
- print("Error capturing screen: Could not find monitor at index 1. Trying index 0.")
125
- try: # Fallback to monitor 0 (usually includes all screens)
126
- with mss.mss() as sct:
127
- monitor = sct.monitors[0]
128
- sct_img = sct.grab(monitor)
129
- img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
130
- image_io = io.BytesIO()
131
- img.thumbnail([1024, 1024])
132
- img.save(image_io, format="jpeg")
133
- image_io.seek(0)
134
- mime_type = "image/jpeg"
135
- image_bytes = image_io.read()
136
- return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}
137
- except Exception as e_fallback:
138
- print(f"Error capturing screen (fallback monitor 0): {e_fallback}")
139
- return None
140
- except Exception as e:
141
- print(f"Error capturing screen: {e}")
142
- return None
143
-
144
- async def get_screen(self):
145
- while True:
146
- if not self.session: # Stop if disconnected
147
- print("get_screen: Session closed, stopping screen task.")
148
- break
149
- # print("Capturing screen...")
150
- frame = await asyncio.to_thread(self._get_screen)
151
- if frame is None:
152
- print("Warning: Failed to capture screen.")
153
- await asyncio.sleep(1.0) # Wait before retrying if error occurred
154
- continue # Skip putting None in queue
155
-
156
- if self.out_queue:
157
- # print("Putting screen frame in queue.")
158
- await self.out_queue.put(frame)
159
-
160
- await asyncio.sleep(1.0) # Send screen frame every second
161
-
162
- async def send_realtime(self):
163
- """Sends microphone audio or video frames from the out_queue to Gemini."""
164
- while True:
165
- if not self.session or not self.out_queue:
166
- # Wait if session/queue not ready or if disconnected
167
- await asyncio.sleep(0.1)
168
- if not self.session: # Check again after sleep if disconnected
169
- print("send_realtime: Session closed, stopping task.")
170
- break
171
- continue
172
-
173
- try:
174
- msg = await asyncio.wait_for(self.out_queue.get(), timeout=1.0) # Wait with timeout
175
- if self.session: # Check again in case session closed while waiting
176
- # print(f"Sending {msg.get('mime_type', 'unknown type')} to Gemini...")
177
- await self.session.send(input=msg)
178
- self.out_queue.task_done()
179
- except asyncio.TimeoutError:
180
- # print("send_realtime: Queue empty, waiting...")
181
- continue # No message in queue, loop again
182
- except asyncio.CancelledError:
183
- print("send_realtime task cancelled.")
184
- break
185
- except Exception as e:
186
- print(f"Error in send_realtime: {e}")
187
- await run_coro_in_background_loop(update_status(f"Send Error: {e}"))
188
- # Avoid continuous errors if session is bad
189
- if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)):
190
- print("Connection error in send_realtime, pausing...")
191
- await asyncio.sleep(5)
192
-
193
-
194
- async def listen_audio(self):
195
- """Listens to microphone and puts audio chunks onto the out_queue."""
196
- global pya
197
- if not pya:
198
- print("Error: PyAudio not initialized in listen_audio.")
199
- await run_coro_in_background_loop(update_status("Error: Audio system not ready."))
200
- return
201
-
202
- mic_info = None
203
- stream = None
204
- try:
205
- print("Attempting to open microphone...")
206
- mic_info = await asyncio.to_thread(pya.get_default_input_device_info)
207
- stream = await asyncio.to_thread(
208
- pya.open,
209
- format=FORMAT,
210
- channels=CHANNELS,
211
- rate=SEND_SAMPLE_RATE,
212
- input=True,
213
- input_device_index=mic_info["index"],
214
- frames_per_buffer=CHUNK_SIZE,
215
- )
216
- self.audio_stream = stream # Store reference for cleanup
217
- print("Microphone stream opened.")
218
- if __debug__:
219
- kwargs = {"exception_on_overflow": False}
220
- else:
221
- kwargs = {}
222
-
223
- while True:
224
- if not self.session: # Stop if disconnected
225
- print("listen_audio: Session closed, stopping microphone task.")
226
- break
227
- try:
228
- # print("Reading from microphone...")
229
- data = await asyncio.to_thread(stream.read, CHUNK_SIZE, **kwargs)
230
- if self.out_queue:
231
- # print("Putting microphone data in queue.")
232
- await self.out_queue.put({"data": data, "mime_type": "audio/pcm"})
233
- except IOError as e:
234
- # This often happens if the buffer overflows or the stream is closed abruptly
235
- # print(f"PyAudio read error (possible overflow or stream closed): {e}")
236
- await asyncio.sleep(0.05) # Short pause before trying again
237
- except asyncio.CancelledError:
238
- print("listen_audio task cancelled.")
239
- break
240
-
241
- except OSError as e:
242
- print(f"Error opening microphone: {e}. Is a microphone connected and accessible?")
243
- await run_coro_in_background_loop(update_status(f"Mic Error: {e}"))
244
- except Exception as e:
245
- print(f"Error in listen_audio: {e}")
246
- traceback.print_exc()
247
- await run_coro_in_background_loop(update_status(f"Mic Error: {e}"))
248
- finally:
249
- if stream:
250
- print("Stopping and closing microphone stream.")
251
- await asyncio.to_thread(stream.stop_stream)
252
- await asyncio.to_thread(stream.close)
253
- self.audio_stream = None # Clear reference
254
- print("Microphone stream closed.")
255
-
256
-
257
- # --- Gradio Specific Audio Loop ---
258
- class GradioAudioLoop(OriginalAudioLoop): # Inherit and modify/add methods
259
- def __init__(self, video_mode=DEFAULT_VIDEO_MODE, api_key=None, voice_name=DEFAULT_VOICE):
260
- super().__init__(video_mode)
261
- self.api_key = api_key
262
- self.voice_name = voice_name
263
- self.client = None
264
- self.config = None
265
- self.connection_status = "Disconnected" # Internal status
266
-
267
- # Queues for communication between Gradio handler and background loop
268
- self.text_input_queue = asyncio.Queue()
269
- self.response_text_queue = asyncio.Queue()
270
- self.response_audio_queue = asyncio.Queue()
271
- self.response_event = asyncio.Event() # Signal when response is ready
272
-
273
- # Buffers for accumulating response data within a turn
274
- self.current_audio_buffer = io.BytesIO()
275
- self.current_text_response = ""
276
 
277
- def _initialize_client_and_config(self):
278
- """Initialize Gemini client and configuration."""
279
- if not self.api_key:
280
- raise ValueError("API key is not set.")
281
- try:
282
- # Use v1beta for experimental models if needed, adjust if stable
283
- # http_options={"api_version": "v1beta"} # Try if v1alpha causes issues
284
- # Check if GEMINI_API_KEY env var exists, otherwise use provided key
285
- api_key_to_use = os.getenv("GEMINI_API_KEY", self.api_key)
286
- if not api_key_to_use:
287
- raise ValueError("No API key provided or found in GEMINI_API_KEY environment variable.")
288
 
289
- # Use Client instead of genai.configure if passing key directly
290
- print("Initializing Gemini Client...")
291
- self.client = genai.Client(api_key=api_key_to_use)
292
 
293
- print(f"Setting up LiveConnectConfig with voice: {self.voice_name}")
294
- self.config = types.LiveConnectConfig(
295
- response_modalities=["audio", "text"], # Get both audio and text
296
- speech_config=types.SpeechConfig(
297
- voice_config=types.VoiceConfig(
298
- prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=self.voice_name) # Use selected voice
299
- )
300
- ),
301
- system_instruction=types.Content(
302
- parts=[types.Part.from_text(text=SYSTEM_INSTRUCTION_TEXT)],
303
- role="user" # System instructions often role='user'
304
- ),
305
  )
306
- print("Gemini client and config initialized successfully.")
307
- self.connection_status = "Initialized"
308
- return True # Indicate success
309
- except Exception as e:
310
- print(f"Error initializing Gemini client: {e}")
311
- self.client = None
312
- self.config = None
313
- self.connection_status = f"Initialization Error: {e}"
314
- return False # Indicate failure
315
-
316
-
317
- async def process_text_inputs(self):
318
- """ Task to wait for text input from Gradio and send it to Gemini """
319
- while True:
320
- try:
321
- # Wait indefinitely for an item from the queue
322
- text_to_send = await self.text_input_queue.get()
323
-
324
- if text_to_send is None: # Use None as a signal to stop
325
- print("Stopping text input processing.")
326
- break
327
-
328
- if self.session and self.connection_status == "Connected":
329
- print(f"Sending text to Gemini: {text_to_send[:50]}...")
330
- # Reset response holders before sending new message
331
- self.current_audio_buffer = io.BytesIO()
332
- self.current_text_response = ""
333
- self.response_event.clear()
334
- # Send text and indicate end of turn
335
- await self.session.send(input=text_to_send or ".", end_of_turn=True)
336
- print("Text sent, waiting for response...")
337
- else:
338
- print(f"Warning: Cannot send text. Session not active or status is {self.connection_status}.")
339
- # Signal back an error to the waiting Gradio handler
340
- await self.response_text_queue.put(f"Error: Not connected or connection issue ({self.connection_status}). Cannot send message.")
341
- await self.response_audio_queue.put(b"") # Empty audio
342
- self.response_event.set() # Unblock the handler
343
-
344
- self.text_input_queue.task_done() # Mark task as done
345
-
346
- except asyncio.CancelledError:
347
- print("process_text_inputs task cancelled.")
348
- break
349
- except Exception as e:
350
- print(f"Error in process_text_inputs: {e}")
351
- # Signal error back to the waiting Gradio handler
352
- await self.response_text_queue.put(f"Error sending message: {e}")
353
- await self.response_audio_queue.put(b"")
354
- self.response_event.set()
355
- # Avoid loop BSoD on continuous errors
356
- await asyncio.sleep(1)
357
-
358
-
359
- async def receive_responses(self):
360
- """ Task to receive responses (audio/text) from Gemini """
361
- while True:
362
- if not self.session or self.connection_status != "Connected":
363
- # print("receive_responses: Session not ready or not connected, waiting...")
364
- await asyncio.sleep(0.2)
365
- if not self.session: # Check if disconnected while waiting
366
- print("receive_responses: Session closed, stopping task.")
367
- break
368
- continue
369
-
370
- try:
371
- # print("Waiting for Gemini turn...")
372
- turn = self.session.receive() # This blocks until a turn starts
373
- # print("Gemini turn started.")
374
- async for response in turn:
375
- if data := response.data:
376
- # print(f"Received audio chunk: {len(data)} bytes")
377
- self.current_audio_buffer.write(data)
378
- if text := response.text:
379
- # print(f"Received text chunk: {text}")
380
- self.current_text_response += text
381
-
382
- # Turn complete - put results onto response queues and signal Gradio handler
383
- # print("Gemini turn complete.")
384
- audio_data = self.current_audio_buffer.getvalue()
385
- # print(f"Total audio received: {len(audio_data)} bytes")
386
- # print(f"Total text received: {self.current_text_response}")
387
-
388
- await self.response_audio_queue.put(audio_data)
389
- await self.response_text_queue.put(self.current_text_response)
390
- self.response_event.set() # Signal that response is ready for the Gradio handler
391
-
392
- except asyncio.CancelledError:
393
- print("receive_responses task cancelled.")
394
- break
395
- except google.api_core.exceptions.Cancelled:
396
- print("Gemini receive cancelled (likely due to interruption or end)")
397
- # Signal completion even if cancelled externally
398
- await self.response_audio_queue.put(self.current_audio_buffer.getvalue())
399
- await self.response_text_queue.put(self.current_text_response + " [Receive Cancelled]")
400
- self.response_event.set()
401
- except Exception as e:
402
- print(f"Error receiving responses: {e}")
403
- traceback.print_exc()
404
- # Signal completion with error to unblock handler
405
- await self.response_audio_queue.put(b"") # Empty audio
406
- await self.response_text_queue.put(f"Error receiving response: {e}")
407
- self.response_event.set()
408
- # Pause on significant errors to avoid spamming logs
409
- if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)):
410
- print("Connection error in receive_responses, pausing...")
411
- self.connection_status = f"Receive Error: {e}" # Update status
412
- await asyncio.sleep(5)
413
-
414
-
415
- async def send_message_and_wait_for_response(self, text):
416
- """ Puts text on input queue and waits for the response event """
417
- if not self.session or self.connection_status != "Connected":
418
- return f"Error: Not connected ({self.connection_status}).", None
419
-
420
- await self.text_input_queue.put(text)
421
- print("Waiting for response event...")
422
- try:
423
- # Wait for the event with a timeout
424
- await asyncio.wait_for(self.response_event.wait(), timeout=60.0) # 60 second timeout
425
- print("Response event received.")
426
- except asyncio.TimeoutError:
427
- print("Timeout waiting for Gemini response.")
428
- return "Error: Timeout waiting for response.", None
429
- except Exception as e:
430
- print(f"Error waiting for response event: {e}")
431
- return f"Error waiting for response: {e}", None
432
-
433
- # Retrieve results from response queues
434
- # Use get_nowait as the event guarantees items are present
435
- try:
436
- audio_data = self.response_audio_queue.get_nowait()
437
- text_response = self.response_text_queue.get_nowait()
438
- self.response_audio_queue.task_done()
439
- self.response_text_queue.task_done()
440
- except asyncio.QueueEmpty:
441
- print("Error: Response queues were empty after event was set.")
442
- return "Internal Error: Response queues empty.", None
443
- except Exception as e:
444
- print(f"Error retrieving from response queues: {e}")
445
- return f"Internal Error: {e}", None
446
-
447
-
448
- return text_response, audio_data
449
-
450
- async def run_main_loop(self):
451
- """ The main async method to establish connection and manage tasks """
452
- global background_tasks
453
- if not self._initialize_client_and_config():
454
- print("Initialization failed, cannot connect.")
455
- self.connection_status = "Connection Failed: Initialization error."
456
- await run_coro_in_background_loop(update_status(self.connection_status))
457
- return # Stop if client setup failed
458
-
459
- try:
460
- print(f"Attempting to connect to Gemini model: {MODEL}...")
461
- self.connection_status = "Connecting..."
462
- await run_coro_in_background_loop(update_status(self.connection_status))
463
-
464
- # --- Connect to Gemini ---
465
- # Use a timeout for the connection attempt itself
466
- try:
467
- # The actual connection happens within the context manager entry
468
- async with asyncio.wait_for(
469
- self.client.aio.live.connect(model=MODEL, config=self.config),
470
- timeout=30.0 # 30 second timeout for connection
471
- ) as session:
472
- self.session = session
473
- self.connection_status = "Connected"
474
- print("Session established successfully.")
475
- await run_coro_in_background_loop(update_status(self.connection_status))
476
-
477
- # Queue for mic/video data TO Gemini
478
- self.out_queue = asyncio.Queue(maxsize=20)
479
-
480
- # --- Create and manage background tasks ---
481
- tasks = set()
482
- tasks.add(asyncio.create_task(self.process_text_inputs(), name="process_text_inputs"))
483
- tasks.add(asyncio.create_task(self.receive_responses(), name="receive_responses"))
484
-
485
- if self.video_mode != "none":
486
- tasks.add(asyncio.create_task(self.send_realtime(), name="send_realtime"))
487
- if self.video_mode == "camera":
488
- print("Starting camera input task...")
489
- tasks.add(asyncio.create_task(self.get_frames(), name="get_frames"))
490
- elif self.video_mode == "screen":
491
- print("Starting screen capture task...")
492
- tasks.add(asyncio.create_task(self.get_screen(), name="get_screen"))
493
- # Option to add microphone input alongside video if needed
494
- # print("Starting microphone input task...")
495
- # tasks.add(asyncio.create_task(self.listen_audio(), name="listen_audio"))
496
-
497
- background_tasks.update(tasks)
498
-
499
- # Keep running while connected and tasks are active
500
- # We primarily rely on receive_responses to detect session closure/errors
501
- while self.connection_status == "Connected" and self.session:
502
- await asyncio.sleep(0.5) # Check status periodically
503
-
504
- print("Exiting main run loop (disconnected or error).")
505
-
506
- except asyncio.TimeoutError:
507
- print("CONNECTION FAILED: Timeout while trying to connect.")
508
- self.connection_status = "Connection Failed: Timeout"
509
- await run_coro_in_background_loop(update_status(self.connection_status))
510
- except google.api_core.exceptions.PermissionDenied as e:
511
- print(f"CONNECTION FAILED: Permission Denied. Check API key and permissions. {e}")
512
- self.connection_status = "Connection Failed: Permission Denied"
513
- await run_coro_in_background_loop(update_status(f"{self.connection_status}. Check API Key."))
514
- except google.api_core.exceptions.InvalidArgument as e:
515
- print(f"CONNECTION FAILED: Invalid Argument. Check model name ('{MODEL}') and config. {e}")
516
- self.connection_status = f"Connection Failed: Invalid Argument (Model/Config?)"
517
- await run_coro_in_background_loop(update_status(f"{self.connection_status} Details: {e}"))
518
- except Exception as e: # Catch other potential connection errors
519
- print(f"CONNECTION FAILED: An unexpected error occurred during connection. {e}")
520
- traceback.print_exc()
521
- self.connection_status = f"Connection Failed: {e}"
522
- await run_coro_in_background_loop(update_status(self.connection_status))
523
-
524
- except asyncio.CancelledError:
525
- print("run_main_loop task cancelled.")
526
- self.connection_status = "Disconnected (Cancelled)"
527
- except Exception as e:
528
- print(f"Error in AudioLoop run_main_loop: {e}")
529
- traceback.print_exc()
530
- self.connection_status = f"Runtime Error: {e}"
531
- await run_coro_in_background_loop(update_status(self.connection_status))
532
- finally:
533
- print("Cleaning up audio loop resources...")
534
- final_status = self.connection_status # Capture status before changing
535
- if final_status == "Connected": # If loop exited cleanly but was connected
536
- final_status = "Disconnected"
537
- self.connection_status = "Disconnected" # Ensure status is updated
538
-
539
- # Cancel remaining background tasks associated with *this* instance
540
- tasks_to_cancel = list(background_tasks) # Iterate over a copy
541
- background_tasks.clear() # Clear global set for this instance
542
- for task in tasks_to_cancel:
543
- if task and not task.done():
544
- task.cancel()
545
- print(f"Cancelled task: {task.get_name()}")
546
-
547
- # Close PyAudio stream if open (managed by listen_audio task's finally now)
548
- # if self.audio_stream and not self.audio_stream.is_stopped(): ... handled in listen_audio
549
-
550
- # Reset session and client
551
- self.session = None # Important to signal disconnection
552
- self.client = None
553
- self.out_queue = None # Clear queue reference
554
-
555
- print("AudioLoop run finished.")
556
- await run_coro_in_background_loop(update_status(final_status)) # Update Gradio status
557
-
558
- async def disconnect(self):
559
- """Initiates the disconnection process."""
560
- print("Disconnect requested.")
561
- if self.session:
562
- # The run_main_loop should detect the session closing or errors.
563
- # Explicitly closing the session might be possible depending on the SDK,
564
- # but often letting the context manager exit is the intended way.
565
- # For now, just update status and let the main loop handle cleanup.
566
- print("Setting status to Disconnecting...")
567
- self.connection_status = "Disconnecting"
568
- # Signal tasks relying on session to stop
569
- self.session = None # This should help loops terminate
570
- # Put None on input queue to stop text processor if waiting
571
- await self.text_input_queue.put(None)
572
- else:
573
- self.connection_status = "Disconnected"
574
-
575
- # Cleanup might happen in run_main_loop's finally block
576
- await run_coro_in_background_loop(update_status("Disconnected"))
577
-
578
-
579
- # --- Helper Functions ---
580
-
581
- def start_asyncio_loop():
582
- """Starts the asyncio event loop in a separate thread."""
583
- global background_loop, stop_background_loop
584
- stop_background_loop = False
585
- background_loop = asyncio.new_event_loop()
586
- asyncio.set_event_loop(background_loop)
587
- print("Background asyncio loop starting.")
588
- try:
589
- # Run until explicitly stopped
590
- while not stop_background_loop:
591
- background_loop.call_later(0.1, background_loop.stop) # Wake up periodically
592
- background_loop.run_forever()
593
- if stop_background_loop:
594
- print("Stop signal received, exiting run_forever loop.")
595
- break # Exit outer loop if stopped
596
-
597
- # Run pending tasks before closing
598
- print("Running pending tasks before closing loop...")
599
- pending = asyncio.all_tasks(loop=background_loop)
600
- if pending:
601
- background_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
602
-
603
- except Exception as e:
604
- print(f"Error in background loop: {e}")
605
- traceback.print_exc()
606
- finally:
607
- if background_loop.is_running():
608
- background_loop.stop()
609
- print("Closing background loop...")
610
- # Give tasks a moment to finish cancelling
611
- time.sleep(0.5)
612
- background_loop.close()
613
- print("Background asyncio loop stopped.")
614
- background_loop = None # Clear global ref
615
-
616
- def stop_asyncio_loop():
617
- """Signals the background asyncio loop to stop."""
618
- global stop_background_loop, background_loop
619
- print("Signalling background loop to stop...")
620
- stop_background_loop = True
621
- if background_loop and background_loop.is_running():
622
- # This helps wake up the loop if it's idle
623
- background_loop.call_soon_threadsafe(background_loop.stop)
624
-
625
- async def run_coro_in_background_loop(coro):
626
- """Submits a coroutine to the background event loop and returns its future."""
627
- global background_loop
628
- if background_loop and background_loop.is_running() and not stop_background_loop:
629
- try:
630
- # Use run_coroutine_threadsafe for thread safety
631
- future = asyncio.run_coroutine_threadsafe(coro, background_loop)
632
- return future # Return the concurrent.futures.Future
633
- except RuntimeError as e:
634
- # Handle cases where the loop might be shutting down
635
- print(f"Error submitting coroutine (loop shutting down?): {e}")
636
- future = asyncio.Future()
637
- future.set_exception(e)
638
- return None # Indicate failure to schedule
639
- except Exception as e:
640
- print(f"Unexpected error submitting coroutine: {e}")
641
- future = asyncio.Future()
642
- future.set_exception(e)
643
- return None
644
- else:
645
- print("Error: Background asyncio loop not running or stopping.")
646
- # Create a dummy future that resolves immediately with an error?
647
- # Or just return None to indicate failure
648
- return None
649
-
650
- def format_audio_for_gradio(pcm_data):
651
- """Converts raw PCM data to a format Gradio's Audio component can use."""
652
- if not pcm_data:
653
- # print("No audio data received to format.")
654
- return None
655
- try:
656
- # Ensure PyAudio is initialized to get sample width
657
- if not pya: initialize_py_audio()
658
- if not pya: return None # Could not initialize
659
-
660
- # Create a WAV file in memory
661
- wav_buffer = io.BytesIO()
662
- with wave.open(wav_buffer, 'wb') as wf:
663
- wf.setnchannels(CHANNELS)
664
- wf.setsampwidth(pya.get_sample_size(FORMAT)) # Should be 2 for paInt16
665
- wf.setframerate(RECEIVE_SAMPLE_RATE)
666
- wf.writeframes(pcm_data)
667
- wav_buffer.seek(0)
668
-
669
- # Read the WAV data back and convert to numpy array
670
- with wave.open(wav_buffer, 'rb') as wf_read:
671
- n_frames = wf_read.getnframes()
672
- data = wf_read.readframes(n_frames)
673
- dtype = np.int16 # Based on pyaudio.paInt16
674
- numpy_array = np.frombuffer(data, dtype=dtype)
675
-
676
- # print(f"Formatted audio: {len(numpy_array)} samples, rate {RECEIVE_SAMPLE_RATE}")
677
- # Return tuple for Gradio Audio: (sample_rate, numpy_array)
678
- return (RECEIVE_SAMPLE_RATE, numpy_array)
679
- except Exception as e:
680
- print(f"Error formatting audio: {e}")
681
- traceback.print_exc()
682
- return None
683
-
684
- def initialize_py_audio():
685
- global pya
686
- if pya is None:
687
- try:
688
- print("Initializing PyAudio...")
689
- pya = pyaudio.PyAudio()
690
- print("PyAudio initialized.")
691
- return True
692
- except Exception as e:
693
- print(f"Failed to initialize PyAudio: {e}")
694
- pya = None
695
- return False
696
- return True # Already initialized
697
-
698
- def terminate_py_audio():
699
- global pya
700
- if pya:
701
- print("Terminating PyAudio...")
702
- try:
703
- pya.terminate()
704
- except Exception as e:
705
- print(f"Error terminating PyAudio: {e}")
706
- finally:
707
- pya = None
708
- print("PyAudio terminated.")
709
-
710
-
711
- # --- Gradio Interface and Handlers ---
712
-
713
- # Placeholder for status updates - needs to run in the background loop
714
- async def update_status(new_status: str):
715
- """Coroutine to update the Gradio status component."""
716
- # This function itself doesn't directly update Gradio.
717
- # It relies on being scheduled and the Gradio handler returning the value.
718
- # However, for internal logging, we print here.
719
- print(f"Status Update (async): {new_status}")
720
- # The actual update happens when the calling handler returns this status
721
- # For direct async updates, you'd need Gradio's streaming features if applicable.
722
-
723
-
724
- def handle_connect(api_key, voice_name, video_mode):
725
- """Handles the 'Connect' button click."""
726
- global audio_loop_instance, background_loop, background_thread
727
- print("\n--- Connect Button Clicked ---")
728
- status = "Connecting..."
729
- yield status, None, None # Initial status update
730
-
731
- if not api_key:
732
- yield "Error: Please enter a Gemini API key.", None, None
733
- return
734
- if audio_loop_instance and audio_loop_instance.connection_status not in ["Disconnected", "Initialization Error", "Connection Failed: Timeout", "Connection Failed: Permission Denied", "Connection Failed: Invalid Argument (Model/Config?)"]:
735
- yield f"Already connected or connecting ({audio_loop_instance.connection_status}). Disconnect first.", None, None
736
- return
737
-
738
- # Start background loop thread if not running
739
- if not background_thread or not background_thread.is_alive():
740
- print("Starting background thread...")
741
- background_thread = threading.Thread(target=start_asyncio_loop, daemon=True)
742
- background_thread.start()
743
- time.sleep(0.5) # Give the loop a moment to start
744
-
745
- # Ensure PyAudio is initialized
746
- if not initialize_py_audio():
747
- yield "Error: Failed to initialize audio system.", None, None
748
- return
749
-
750
- print(f"Attempting to connect with voice: {voice_name}, video: {video_mode}")
751
- audio_loop_instance = GradioAudioLoop(video_mode=video_mode, api_key=api_key, voice_name=voice_name)
752
-
753
- # Run the audio loop's main logic in the background asyncio loop
754
- connect_future = run_coro_in_background_loop(audio_loop_instance.run_main_loop())
755
- if not connect_future:
756
- audio_loop_instance = None # Cleanup if scheduling failed
757
- yield "Error: Failed to schedule connection task.", None, None
758
- return
759
-
760
- # Don't block Gradio here. The run_main_loop will update status via update_status coroutine calls.
761
- # We yield the initial "Connecting..." status. Subsequent updates handled async.
762
- # We might need a short sleep/check or rely purely on async updates. Let's rely on async updates.
763
- # yield "Connecting... Waiting for confirmation.", None, None
764
- # Add a small delay to allow the initial connection steps to run and update status
765
- await asyncio.sleep(1) # Use await if in async context, time.sleep otherwise? Gradio handler might be sync.
766
- # Use time.sleep in sync Gradio handler context
767
- time.sleep(1.5)
768
-
769
- # The final status will be updated by the run_main_loop's finally block or error handling
770
- # Check the instance status directly after a short wait
771
- if audio_loop_instance:
772
- current_status = audio_loop_instance.connection_status
773
- yield current_status, None, None
774
- else:
775
- # This case shouldn't happen if scheduling worked, but as a fallback
776
- yield "Error: Connection process failed unexpectedly.", None, None
777
-
778
-
779
- def handle_disconnect():
780
- """Handles the 'Disconnect' button click."""
781
- global audio_loop_instance
782
- print("\n--- Disconnect Button Clicked ---")
783
- status = "Disconnecting..."
784
- yield status, None, None # Initial status update
785
-
786
- if not audio_loop_instance or audio_loop_instance.connection_status == "Disconnected":
787
- yield "Already disconnected.", None, None
788
- return
789
-
790
- # Schedule the disconnect coroutine
791
- disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect())
792
- if not disconnect_future:
793
- yield "Error: Failed to schedule disconnection task.", None, None
794
- return
795
-
796
- try:
797
- # Wait briefly for disconnect to initiate
798
- disconnect_future.result(timeout=5.0)
799
- status = "Disconnected"
800
- except TimeoutError:
801
- status = "Disconnect timeout. Check logs."
802
- print("Timeout waiting for disconnect confirmation.")
803
- except Exception as e:
804
- status = f"Error during disconnect: {e}"
805
- print(f"Error during disconnect future result: {e}")
806
-
807
- # Clean up global instance
808
- audio_loop_instance = None
809
-
810
- # Optionally terminate PyAudio here or let atexit handle it
811
- # terminate_py_audio() # Can cause issues if connect is clicked again quickly
812
-
813
- yield status, None, None # Final status update
814
-
815
-
816
- def handle_send_message(message):
817
- """Handles sending a text message."""
818
- global audio_loop_instance
819
- print(f"\n--- Sending Message: {message[:30]}... ---")
820
-
821
- if not audio_loop_instance or audio_loop_instance.connection_status != "Connected":
822
- yield "Error: Not connected. Cannot send message.", None # Update status text, no audio
823
- return
824
-
825
- if not message or message.strip() == "":
826
- yield "Cannot send empty message.", None
827
- return
828
-
829
- # Clear previous outputs
830
- yield "Sending message...", None # Update status, clear audio
831
-
832
- # Schedule the send/receive task and wait for its result
833
- response_future = run_coro_in_background_loop(
834
- audio_loop_instance.send_message_and_wait_for_response(message)
835
  )
836
-
837
- if not response_future:
838
- yield "Error: Failed to schedule message task.", None
839
- return
840
-
841
- text_response = "Error: No response received."
842
- audio_output = None
 
 
 
 
 
 
 
 
 
 
 
843
  try:
844
- # Wait for the background task to complete and return results
845
- # Adjust timeout as needed
846
- result_text, result_audio_data = response_future.result(timeout=60.0) # Wait up to 60 secs
847
-
848
- text_response = result_text
849
- if result_audio_data:
850
- print(f"Received audio data ({len(result_audio_data)} bytes), formatting...")
851
- audio_output = format_audio_for_gradio(result_audio_data)
852
- if audio_output is None:
853
- print("Failed to format audio for Gradio.")
854
- text_response += " [Audio Formatting Error]"
855
- else:
856
- print("No audio data received in response.")
857
- text_response += " [No Audio Received]"
858
-
859
-
860
- except TimeoutError:
861
- print("Timeout waiting for response future.")
862
- text_response = "Error: Timeout waiting for Gemini response."
863
- # Optionally try to cancel the future if possible/needed
864
  except Exception as e:
865
- print(f"Error getting result from response future: {e}")
866
- traceback.print_exc()
867
- text_response = f"Error processing response: {e}"
868
-
869
- print(f"Final Text Response: {text_response}")
870
- print(f"Final Audio Output: {'Present' if audio_output else 'None'}")
871
- yield text_response, audio_output
872
-
873
-
874
- # --- Gradio Interface Definition ---
875
- with gr.Blocks(theme=gr.themes.Soft()) as demo:
876
- gr.Markdown("# Gemini LiveConnect TTS Interface")
877
- gr.Markdown(f"Using Model: `{MODEL}`")
878
-
879
- with gr.Row():
880
- api_key_input = gr.Textbox(label="Gemini API Key", type="password", placeholder="Enter your API key")
881
- voice_select = gr.Dropdown(label="Select Voice", choices=AVAILABLE_VOICES, value=DEFAULT_VOICE)
882
- video_mode_select = gr.Radio(label="Video Input (Optional)", choices=["none", "camera", "screen"], value=DEFAULT_VIDEO_MODE, visible=False) # Hidden for now, focus on TTS
883
-
884
- with gr.Row():
885
- connect_button = gr.Button("Connect")
886
- disconnect_button = gr.Button("Disconnect")
887
 
888
- status_output = gr.Textbox(label="Status", value="Disconnected", interactive=False)
889
-
890
- with gr.Column():
891
- message_input = gr.Textbox(label="Your Message", placeholder="Type your message here...")
892
- send_button = gr.Button("Send Message")
893
-
894
- with gr.Column():
895
- gr.Markdown("## Response")
896
- response_text_output = gr.Textbox(label="Gemini Text", interactive=False)
897
- audio_output = gr.Audio(label="Gemini Audio", type="numpy", interactive=False) # Use numpy for (rate, data) tuple
898
-
899
- # --- Event Handlers ---
900
- connect_button.click(
901
- fn=handle_connect,
902
- inputs=[api_key_input, voice_select, video_mode_select],
903
- outputs=[status_output, response_text_output, audio_output] # Clear outputs on connect
904
- )
905
- disconnect_button.click(
906
- fn=handle_disconnect,
907
- inputs=[],
908
- outputs=[status_output, response_text_output, audio_output] # Clear outputs on disconnect
909
- )
910
- send_button.click(
911
- fn=handle_send_message,
912
- inputs=[message_input],
913
- outputs=[response_text_output, audio_output]
914
- )
915
- # Allow sending message by pressing Enter in the textbox
916
- message_input.submit(
917
- fn=handle_send_message,
918
- inputs=[message_input],
919
- outputs=[response_text_output, audio_output]
920
- )
921
-
922
-
923
- # --- Cleanup Function ---
924
- def cleanup():
925
- print("Running cleanup...")
926
- global audio_loop_instance
927
-
928
- # Disconnect if connected
929
- if audio_loop_instance and audio_loop_instance.connection_status != "Disconnected":
930
- print("Disconnecting during cleanup...")
931
- disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect())
932
- if disconnect_future:
933
- try:
934
- disconnect_future.result(timeout=5.0)
935
- print("Disconnect successful during cleanup.")
936
- except Exception as e:
937
- print(f"Error during cleanup disconnect: {e}")
938
- audio_loop_instance = None
939
-
940
- # Signal background loop to stop
941
- stop_asyncio_loop()
942
-
943
- # Wait for background thread to finish
944
- if background_thread and background_thread.is_alive():
945
- print("Waiting for background thread to join...")
946
- background_thread.join(timeout=5.0)
947
- if background_thread.is_alive():
948
- print("Warning: Background thread did not exit cleanly.")
949
-
950
- # Terminate PyAudio
951
- terminate_py_audio()
952
- print("Cleanup finished.")
953
-
954
- # Register cleanup function to run on exit
955
- atexit.register(cleanup)
956
-
957
- # --- Main Execution ---
958
  if __name__ == "__main__":
959
- # Start the background thread immediately (optional, connect can start it too)
960
- # print("Starting background thread on launch...")
961
- # background_thread = threading.Thread(target=start_asyncio_loop, daemon=True)
962
- # background_thread.start()
963
-
964
- print("Launching Gradio Interface...")
965
- # Share=True to create a public link (remove if not needed)
966
- demo.queue().launch(share=False)
967
-
968
- # Keep main thread alive while Gradio is running (Gradio launch blocks)
969
- print("Gradio Interface closed.")
970
- # Cleanup is handled by atexit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
+ import asyncio
3
+ from google import genai
4
+ from google.genai import types
5
+ import gradio as gr
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
+ # Set your Gemini API key (configure via Hugging Face Secrets)
8
+ API_KEY = os.getenv("GEMINI_API_KEY")
 
 
 
 
 
 
 
 
 
9
 
10
+ client = genai.Client(api_key=API_KEY)
 
 
11
 
12
+ async def generate_audio(text):
13
+ config = types.LiveConnectConfig(
14
+ response_modalities=["audio"],
15
+ speech_config=types.SpeechConfig(
16
+ voice_config=types.VoiceConfig(
17
+ prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Puck")
 
 
 
 
 
 
18
  )
19
+ ),
20
+ system_instruction=types.Content(
21
+ parts=[types.Part.from_text("Repeat user input exactly without explanation")],
22
+ role="user"
23
+ ),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  )
25
+
26
+ audio_data = b""
27
+ async with client.aio.live.connect(model="models/gemini-2.0-flash-exp", config=config) as session:
28
+ await session.send(input=text, end_of_turn=True)
29
+ async for response in session.receive():
30
+ if data := response.data:
31
+ audio_data += data
32
+
33
+ # Save as WAV file (16-bit PCM, 24kHz)
34
+ with open("output.wav", "wb") as f:
35
+ f.write(b"RIFF\x00\x00\x00\x00WAVEfmt \x10\x00\x00\x00\x01\x00\x01\x00\x00\x7d\x00\x00\x02\x00\x10\x00data\x00\x00\x00\x00")
36
+ f.write(audio_data)
37
+
38
+ return "output.wav"
39
+
40
+ def tts(text):
41
+ if not text.strip():
42
+ return None
43
  try:
44
+ asyncio.run(generate_audio(text))
45
+ return "output.wav"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  except Exception as e:
47
+ return f"Error: {str(e)}"
48
+
49
+ # Gradio Interface
50
+ iface = gr.Interface(
51
+ fn=tts,
52
+ inputs=gr.Textbox(label="Enter Text", placeholder="Type here..."),
53
+ outputs=gr.Audio(label="TTS Output", type="filepath"),
54
+ examples=["Hello, this is a test.", "How are you today?"],
55
+ title="Gemini TTS Demo",
56
+ description="Convert text to speech using Google's Gemini 2.0 Flash model"
57
+ )
 
 
 
 
 
 
 
 
 
 
 
58
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
  if __name__ == "__main__":
60
+ iface.launch()