|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
import base64 |
|
import io |
|
import traceback |
|
|
|
import cv2 |
|
import pyaudio |
|
import PIL.Image |
|
import mss |
|
|
|
import argparse |
|
|
|
from google import genai |
|
from google.genai import types |
|
|
|
import gradio as gr |
|
|
|
FORMAT = pyaudio.paInt16 |
|
CHANNELS = 1 |
|
SEND_SAMPLE_RATE = 16000 |
|
RECEIVE_SAMPLE_RATE = 24000 |
|
CHUNK_SIZE = 1024 |
|
|
|
MODEL = "models/gemini-2.0-flash-exp" |
|
|
|
DEFAULT_MODE = "camera" |
|
|
|
|
|
|
|
client = genai.Client(http_options={"api_version": "v1alpha"}, api_key="GEMINI_API_KEY") |
|
|
|
|
|
|
|
CONFIG = types.LiveConnectConfig( |
|
response_modalities=[ |
|
"audio", |
|
], |
|
speech_config=types.SpeechConfig( |
|
voice_config=types.VoiceConfig( |
|
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Puck") |
|
) |
|
), |
|
system_instruction=types.Content( |
|
parts=[types.Part.from_text(text="Answer user ask replay same thing user say no other word explain ")], |
|
role="user" |
|
), |
|
) |
|
|
|
pya = pyaudio.PyAudio() |
|
|
|
|
|
class AudioLoop: |
|
def __init__(self, video_mode=DEFAULT_MODE): |
|
self.video_mode = video_mode |
|
|
|
self.audio_in_queue = None |
|
self.out_queue = None |
|
|
|
self.session = None |
|
|
|
self.send_text_task = None |
|
self.receive_audio_task = None |
|
self.play_audio_task = None |
|
|
|
async def send_text(self, text): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await self.session.send(input=text or ".", end_of_turn=True) |
|
|
|
def _get_frame(self, cap): |
|
|
|
ret, frame = cap.read() |
|
|
|
if not ret: |
|
return None |
|
|
|
|
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
img = PIL.Image.fromarray(frame_rgb) |
|
img.thumbnail([1024, 1024]) |
|
|
|
image_io = io.BytesIO() |
|
img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
|
|
mime_type = "image/jpeg" |
|
image_bytes = image_io.read() |
|
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} |
|
|
|
async def get_frames(self): |
|
|
|
|
|
cap = await asyncio.to_thread( |
|
cv2.VideoCapture, 0 |
|
) |
|
|
|
while True: |
|
frame = await asyncio.to_thread(self._get_frame, cap) |
|
if frame is None: |
|
break |
|
|
|
await asyncio.sleep(1.0) |
|
|
|
await self.out_queue.put(frame) |
|
|
|
|
|
cap.release() |
|
|
|
def _get_screen(self): |
|
sct = mss.mss() |
|
monitor = sct.monitors[0] |
|
|
|
i = sct.grab(monitor) |
|
|
|
mime_type = "image/jpeg" |
|
image_bytes = mss.tools.to_png(i.rgb, i.size) |
|
img = PIL.Image.open(io.BytesIO(image_bytes)) |
|
|
|
image_io = io.BytesIO() |
|
img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
|
|
image_bytes = image_io.read() |
|
return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} |
|
|
|
async def get_screen(self): |
|
|
|
while True: |
|
frame = await asyncio.to_thread(self._get_screen) |
|
if frame is None: |
|
break |
|
|
|
await asyncio.sleep(1.0) |
|
|
|
await self.out_queue.put(frame) |
|
|
|
async def send_realtime(self): |
|
while True: |
|
msg = await self.out_queue.get() |
|
await self.session.send(input=msg) |
|
|
|
async def listen_audio(self): |
|
mic_info = pya.get_default_input_device_info() |
|
self.audio_stream = await asyncio.to_thread( |
|
pya.open, |
|
format=FORMAT, |
|
channels=CHANNELS, |
|
rate=SEND_SAMPLE_RATE, |
|
input=True, |
|
input_device_index=mic_info["index"], |
|
frames_per_buffer=CHUNK_SIZE, |
|
) |
|
if __debug__: |
|
kwargs = {"exception_on_overflow": False} |
|
else: |
|
kwargs = {} |
|
while True: |
|
data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, **kwargs) |
|
await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) |
|
|
|
async def receive_audio(self): |
|
"Background task to reads from the websocket and write pcm chunks to the output queue" |
|
while True: |
|
turn = self.session.receive() |
|
async for response in turn: |
|
if data := response.data: |
|
self.audio_in_queue.put_nowait(data) |
|
continue |
|
if text := response.text: |
|
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
while not self.audio_in_queue.empty(): |
|
self.audio_in_queue.get_nowait() |
|
|
|
async def play_audio(self): |
|
stream = await asyncio.to_thread( |
|
pya.open, |
|
format=FORMAT, |
|
channels=CHANNELS, |
|
rate=RECEIVE_SAMPLE_RATE, |
|
output=True, |
|
) |
|
while True: |
|
bytestream = await self.audio_in_queue.get() |
|
await asyncio.to_thread(stream.write, bytestream) |
|
|
|
async def run(self): |
|
try: |
|
async with ( |
|
client.aio.live.connect(model=MODEL, config=CONFIG) as session, |
|
asyncio.TaskGroup() as tg, |
|
): |
|
self.session = session |
|
|
|
self.audio_in_queue = asyncio.Queue() |
|
self.out_queue = asyncio.Queue(maxsize=5) |
|
|
|
|
|
tg.create_task(self.send_realtime()) |
|
tg.create_task(self.listen_audio()) |
|
if self.video_mode == "camera": |
|
tg.create_task(self.get_frames()) |
|
elif self.video_mode == "screen": |
|
tg.create_task(self.get_screen()) |
|
|
|
tg.create_task(self.receive_audio()) |
|
tg.create_task(self.play_audio()) |
|
|
|
|
|
|
|
return await self.receive_audio() |
|
|
|
except asyncio.CancelledError: |
|
pass |
|
except ExceptionGroup as EG: |
|
self.audio_stream.close() |
|
traceback.print_exception(EG) |
|
except Exception as e: |
|
traceback.print_exc() |
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
audio_loop = None |
|
|
|
async def transcribe_audio(text_input): |
|
""" |
|
Transcribes audio using the AudioLoop class and returns the result. |
|
""" |
|
global audio_loop |
|
if audio_loop is None: |
|
audio_loop = AudioLoop(video_mode="none") |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if audio_loop.session is None: |
|
try: |
|
return await audio_loop.run() |
|
except Exception as e: |
|
print(f"Error in run(): {e}") |
|
traceback.print_exc() |
|
return f"Error: {str(e)}" |
|
else: |
|
try: |
|
await audio_loop.send_text(text_input) |
|
return await audio_loop.receive_audio() |
|
except Exception as e: |
|
print(f"Error after session is established: {e}") |
|
traceback.print_exc() |
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
iface = gr.Interface( |
|
fn=transcribe_audio, |
|
inputs=gr.Textbox(lines=2, placeholder="Enter text here..."), |
|
outputs="text", |
|
title="Gemini Live Connect Demo with Gradio", |
|
description="Enter text, and the model will replay same you said. This is a demo of the Gemini Live Connect API with Gradio.", |
|
) |
|
iface.launch() |