|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
import base64 |
|
from enum import Enum |
|
import json |
|
import traceback |
|
import time |
|
from google import genai |
|
import numpy as np |
|
from typing import Iterable, cast |
|
|
|
import websockets |
|
|
|
from ten import ( |
|
AudioFrame, |
|
AsyncTenEnv, |
|
Cmd, |
|
StatusCode, |
|
CmdResult, |
|
Data, |
|
) |
|
from ten.audio_frame import AudioFrameDataFmt |
|
from ten_ai_base.const import CMD_PROPERTY_RESULT, CMD_TOOL_CALL |
|
from dataclasses import dataclass |
|
from ten_ai_base.config import BaseConfig |
|
from ten_ai_base.chat_memory import ChatMemory |
|
from ten_ai_base.usage import ( |
|
LLMUsage, |
|
LLMCompletionTokensDetails, |
|
LLMPromptTokensDetails, |
|
) |
|
from ten_ai_base.types import ( |
|
LLMToolMetadata, |
|
LLMToolResult, |
|
LLMChatCompletionContentPartParam, |
|
TTSPcmOptions, |
|
) |
|
from ten_ai_base.llm import AsyncLLMBaseExtension |
|
from google.genai.types import ( |
|
LiveServerMessage, |
|
LiveConnectConfig, |
|
LiveConnectConfigDict, |
|
GenerationConfig, |
|
Content, |
|
Part, |
|
Tool, |
|
FunctionDeclaration, |
|
Schema, |
|
LiveClientToolResponse, |
|
FunctionCall, |
|
FunctionResponse, |
|
SpeechConfig, |
|
VoiceConfig, |
|
PrebuiltVoiceConfig, |
|
) |
|
from google.genai.live import AsyncSession |
|
from PIL import Image |
|
from io import BytesIO |
|
from base64 import b64encode |
|
|
|
import urllib.parse |
|
import google.genai._api_client |
|
|
|
google.genai._api_client.urllib = urllib |
|
|
|
CMD_IN_FLUSH = "flush" |
|
CMD_IN_ON_USER_JOINED = "on_user_joined" |
|
CMD_IN_ON_USER_LEFT = "on_user_left" |
|
CMD_OUT_FLUSH = "flush" |
|
|
|
|
|
class Role(str, Enum): |
|
User = "user" |
|
Assistant = "assistant" |
|
|
|
|
|
def rgb2base64jpeg(rgb_data, width, height): |
|
|
|
pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) |
|
pil_image = pil_image.convert("RGB") |
|
|
|
|
|
pil_image = resize_image_keep_aspect(pil_image, 512) |
|
|
|
|
|
buffered = BytesIO() |
|
pil_image.save(buffered, format="JPEG") |
|
|
|
|
|
|
|
jpeg_image_data = buffered.getvalue() |
|
|
|
|
|
base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") |
|
|
|
|
|
|
|
return base64_encoded_image |
|
|
|
|
|
def resize_image_keep_aspect(image, max_size=512): |
|
""" |
|
Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. |
|
If both dimensions are smaller than max_size, the image is not resized. |
|
|
|
:param image: A PIL Image object |
|
:param max_size: The maximum size for the larger dimension (width or height) |
|
:return: A PIL Image object (resized or original) |
|
""" |
|
|
|
width, height = image.size |
|
|
|
|
|
if width <= max_size and height <= max_size: |
|
return image |
|
|
|
|
|
aspect_ratio = width / height |
|
|
|
|
|
if width > height: |
|
new_width = max_size |
|
new_height = int(max_size / aspect_ratio) |
|
else: |
|
new_height = max_size |
|
new_width = int(max_size * aspect_ratio) |
|
|
|
|
|
resized_image = image.resize((new_width, new_height)) |
|
|
|
return resized_image |
|
|
|
|
|
@dataclass |
|
class GeminiRealtimeConfig(BaseConfig): |
|
base_uri: str = "generativelanguage.googleapis.com" |
|
api_key: str = "" |
|
api_version: str = "v1alpha" |
|
model: str = "gemini-2.0-flash-exp" |
|
language: str = "en-US" |
|
prompt: str = "" |
|
temperature: float = 0.5 |
|
max_tokens: int = 1024 |
|
voice: str = "Puck" |
|
server_vad: bool = True |
|
audio_out: bool = True |
|
input_transcript: bool = True |
|
sample_rate: int = 24000 |
|
stream_id: int = 0 |
|
dump: bool = False |
|
greeting: str = "" |
|
|
|
def build_ctx(self) -> dict: |
|
return { |
|
"language": self.language, |
|
"model": self.model, |
|
} |
|
|
|
|
|
class GeminiRealtimeExtension(AsyncLLMBaseExtension): |
|
def __init__(self, name): |
|
super().__init__(name) |
|
self.config: GeminiRealtimeConfig = None |
|
self.stopped: bool = False |
|
self.connected: bool = False |
|
self.buffer: bytearray = b"" |
|
self.memory: ChatMemory = None |
|
self.total_usage: LLMUsage = LLMUsage() |
|
self.users_count = 0 |
|
|
|
self.stream_id: int = 0 |
|
self.remote_stream_id: int = 0 |
|
self.channel_name: str = "" |
|
self.audio_len_threshold: int = 5120 |
|
|
|
self.completion_times = [] |
|
self.connect_times = [] |
|
self.first_token_times = [] |
|
|
|
self.buff: bytearray = b"" |
|
self.transcript: str = "" |
|
self.ctx: dict = {} |
|
self.input_end = time.time() |
|
self.client = None |
|
self.session: AsyncSession = None |
|
self.leftover_bytes = b"" |
|
self.video_task = None |
|
self.image_queue = asyncio.Queue() |
|
self.video_buff: str = "" |
|
self.loop = None |
|
self.ten_env = None |
|
|
|
async def on_init(self, ten_env: AsyncTenEnv) -> None: |
|
await super().on_init(ten_env) |
|
ten_env.log_debug("on_init") |
|
|
|
async def on_start(self, ten_env: AsyncTenEnv) -> None: |
|
await super().on_start(ten_env) |
|
self.ten_env = ten_env |
|
ten_env.log_debug("on_start") |
|
|
|
self.loop = asyncio.get_event_loop() |
|
|
|
self.config = await GeminiRealtimeConfig.create_async(ten_env=ten_env) |
|
ten_env.log_info(f"config: {self.config}") |
|
|
|
if not self.config.api_key: |
|
ten_env.log_error("api_key is required") |
|
return |
|
|
|
try: |
|
self.ctx = self.config.build_ctx() |
|
self.ctx["greeting"] = self.config.greeting |
|
|
|
self.client = genai.Client( |
|
api_key=self.config.api_key, |
|
http_options={ |
|
"api_version": self.config.api_version, |
|
"url": self.config.base_uri, |
|
}, |
|
) |
|
self.loop.create_task(self._loop(ten_env)) |
|
self.loop.create_task(self._on_video(ten_env)) |
|
|
|
|
|
except Exception as e: |
|
traceback.print_exc() |
|
self.ten_env.log_error(f"Failed to init client {e}") |
|
|
|
async def _loop(self, ten_env: AsyncTenEnv) -> None: |
|
while not self.stopped: |
|
await asyncio.sleep(1) |
|
try: |
|
config: LiveConnectConfig = self._get_session_config() |
|
ten_env.log_info("Start listen") |
|
async with self.client.aio.live.connect( |
|
model=self.config.model, config=config |
|
) as session: |
|
ten_env.log_info("Connected") |
|
session = cast(AsyncSession, session) |
|
self.session = session |
|
self.connected = True |
|
|
|
await self._greeting() |
|
|
|
while True: |
|
try: |
|
async for response in session.receive(): |
|
response = cast(LiveServerMessage, response) |
|
|
|
try: |
|
if response.server_content: |
|
if response.server_content.interrupted: |
|
ten_env.log_info("Interrupted") |
|
await self._flush() |
|
continue |
|
elif ( |
|
not response.server_content.turn_complete |
|
and response.server_content.model_turn |
|
): |
|
for ( |
|
part |
|
) in ( |
|
response.server_content.model_turn.parts |
|
): |
|
await self.send_audio_out( |
|
ten_env, |
|
part.inline_data.data, |
|
sample_rate=24000, |
|
bytes_per_sample=2, |
|
number_of_channels=1, |
|
) |
|
elif response.server_content.turn_complete: |
|
ten_env.log_info("Turn complete") |
|
elif response.setup_complete: |
|
ten_env.log_info("Setup complete") |
|
elif response.tool_call: |
|
func_calls = response.tool_call.function_calls |
|
self.loop.create_task( |
|
self._handle_tool_call(func_calls) |
|
) |
|
except Exception: |
|
traceback.print_exc() |
|
ten_env.log_error("Failed to handle response") |
|
|
|
await self._flush() |
|
ten_env.log_info("Finish listen") |
|
except websockets.exceptions.ConnectionClosedOK: |
|
ten_env.log_info("Connection closed") |
|
break |
|
except Exception as e: |
|
self.ten_env.log_error(f"Failed to handle loop {e}") |
|
|
|
async def send_audio_out( |
|
self, ten_env: AsyncTenEnv, audio_data: bytes, **args: TTSPcmOptions |
|
) -> None: |
|
"""End sending audio out.""" |
|
sample_rate = args.get("sample_rate", 24000) |
|
bytes_per_sample = args.get("bytes_per_sample", 2) |
|
number_of_channels = args.get("number_of_channels", 1) |
|
try: |
|
|
|
combined_data = self.leftover_bytes + audio_data |
|
|
|
|
|
if len(combined_data) % (bytes_per_sample * number_of_channels) != 0: |
|
|
|
valid_length = len(combined_data) - ( |
|
len(combined_data) % (bytes_per_sample * number_of_channels) |
|
) |
|
self.leftover_bytes = combined_data[valid_length:] |
|
combined_data = combined_data[:valid_length] |
|
else: |
|
self.leftover_bytes = b"" |
|
|
|
if combined_data: |
|
f = AudioFrame.create("pcm_frame") |
|
f.set_sample_rate(sample_rate) |
|
f.set_bytes_per_sample(bytes_per_sample) |
|
f.set_number_of_channels(number_of_channels) |
|
f.set_data_fmt(AudioFrameDataFmt.INTERLEAVE) |
|
f.set_samples_per_channel( |
|
len(combined_data) // (bytes_per_sample * number_of_channels) |
|
) |
|
f.alloc_buf(len(combined_data)) |
|
buff = f.lock_buf() |
|
buff[:] = combined_data |
|
f.unlock_buf(buff) |
|
await ten_env.send_audio_frame(f) |
|
except Exception: |
|
pass |
|
|
|
|
|
async def on_stop(self, ten_env: AsyncTenEnv) -> None: |
|
await super().on_stop(ten_env) |
|
ten_env.log_info("on_stop") |
|
|
|
self.stopped = True |
|
if self.session: |
|
await self.session.close() |
|
|
|
async def on_audio_frame( |
|
self, ten_env: AsyncTenEnv, audio_frame: AudioFrame |
|
) -> None: |
|
await super().on_audio_frame(ten_env, audio_frame) |
|
try: |
|
stream_id = audio_frame.get_property_int("stream_id") |
|
if self.channel_name == "": |
|
self.channel_name = audio_frame.get_property_string("channel") |
|
|
|
if self.remote_stream_id == 0: |
|
self.remote_stream_id = stream_id |
|
|
|
frame_buf = audio_frame.get_buf() |
|
self._dump_audio_if_need(frame_buf, Role.User) |
|
|
|
await self._on_audio(frame_buf) |
|
if not self.config.server_vad: |
|
self.input_end = time.time() |
|
except Exception as e: |
|
traceback.print_exc() |
|
self.ten_env.log_error(f"on audio frame failed {e}") |
|
|
|
async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: |
|
cmd_name = cmd.get_name() |
|
ten_env.log_debug(f"on_cmd name {cmd_name}") |
|
|
|
status = StatusCode.OK |
|
detail = "success" |
|
|
|
if cmd_name == CMD_IN_FLUSH: |
|
|
|
await self._flush() |
|
await ten_env.send_cmd(Cmd.create(CMD_OUT_FLUSH)) |
|
ten_env.log_info("on flush") |
|
elif cmd_name == CMD_IN_ON_USER_JOINED: |
|
self.users_count += 1 |
|
|
|
if self.users_count == 1: |
|
await self._greeting() |
|
elif cmd_name == CMD_IN_ON_USER_LEFT: |
|
self.users_count -= 1 |
|
else: |
|
|
|
await super().on_cmd(ten_env, cmd) |
|
return |
|
|
|
cmd_result = CmdResult.create(status) |
|
cmd_result.set_property_string("detail", detail) |
|
await ten_env.return_result(cmd_result, cmd) |
|
|
|
|
|
async def on_data(self, ten_env: AsyncTenEnv, data: Data) -> None: |
|
pass |
|
|
|
async def on_video_frame(self, async_ten_env, video_frame): |
|
await super().on_video_frame(async_ten_env, video_frame) |
|
image_data = video_frame.get_buf() |
|
image_width = video_frame.get_width() |
|
image_height = video_frame.get_height() |
|
await self.image_queue.put([image_data, image_width, image_height]) |
|
|
|
async def _on_video(self, _: AsyncTenEnv): |
|
while True: |
|
|
|
|
|
[image_data, image_width, image_height] = await self.image_queue.get() |
|
self.video_buff = rgb2base64jpeg(image_data, image_width, image_height) |
|
media_chunks = [ |
|
{ |
|
"data": self.video_buff, |
|
"mime_type": "image/jpeg", |
|
} |
|
] |
|
try: |
|
if self.connected: |
|
|
|
await self.session.send(media_chunks) |
|
except Exception as e: |
|
self.ten_env.log_error(f"Failed to send image {e}") |
|
|
|
|
|
while not self.image_queue.empty(): |
|
await self.image_queue.get() |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
async def _on_audio(self, buff: bytearray): |
|
self.buff += buff |
|
|
|
if self.connected and len(self.buff) >= self.audio_len_threshold: |
|
|
|
try: |
|
media_chunks = [ |
|
{ |
|
"data": base64.b64encode(self.buff).decode(), |
|
"mime_type": "audio/pcm", |
|
} |
|
] |
|
|
|
await self.session.send(media_chunks) |
|
self.buff = b"" |
|
except Exception as e: |
|
|
|
self.ten_env.log_error(f"Failed to send audio {e}") |
|
|
|
def _get_session_config(self) -> LiveConnectConfigDict: |
|
def tool_dict(tool: LLMToolMetadata): |
|
required = [] |
|
properties: dict[str, "Schema"] = {} |
|
|
|
for param in tool.parameters: |
|
properties[param.name] = Schema( |
|
type=param.type.upper(), description=param.description |
|
) |
|
if param.required: |
|
required.append(param.name) |
|
|
|
t = Tool( |
|
function_declarations=[ |
|
FunctionDeclaration( |
|
name=tool.name, |
|
description=tool.description, |
|
parameters=Schema( |
|
type="OBJECT", properties=properties, required=required |
|
), |
|
) |
|
] |
|
) |
|
|
|
return t |
|
|
|
tools = ( |
|
[tool_dict(t) for t in self.available_tools] |
|
if len(self.available_tools) > 0 |
|
else [] |
|
) |
|
|
|
tools.append(Tool(google_search={})) |
|
tools.append(Tool(code_execution={})) |
|
|
|
config = LiveConnectConfig( |
|
response_modalities=["AUDIO"], |
|
system_instruction=Content(parts=[Part(text=self.config.prompt)]), |
|
tools=tools, |
|
|
|
speech_config=SpeechConfig( |
|
voice_config=VoiceConfig( |
|
prebuilt_voice_config=PrebuiltVoiceConfig( |
|
voice_name=self.config.voice |
|
) |
|
) |
|
), |
|
generation_config=GenerationConfig( |
|
temperature=self.config.temperature, |
|
max_output_tokens=self.config.max_tokens, |
|
), |
|
) |
|
|
|
return config |
|
|
|
async def on_tools_update( |
|
self, ten_env: AsyncTenEnv, tool: LLMToolMetadata |
|
) -> None: |
|
"""Called when a new tool is registered. Implement this method to process the new tool.""" |
|
ten_env.log_info(f"on tools update {tool}") |
|
|
|
|
|
def _replace(self, prompt: str) -> str: |
|
result = prompt |
|
for token, value in self.ctx.items(): |
|
result = result.replace("{" + token + "}", value) |
|
return result |
|
|
|
def _send_transcript(self, content: str, role: Role, is_final: bool) -> None: |
|
def is_punctuation(char): |
|
if char in [",", ",", ".", "。", "?", "?", "!", "!"]: |
|
return True |
|
return False |
|
|
|
def parse_sentences(sentence_fragment, content): |
|
sentences = [] |
|
current_sentence = sentence_fragment |
|
for char in content: |
|
current_sentence += char |
|
if is_punctuation(char): |
|
|
|
stripped_sentence = current_sentence |
|
if any(c.isalnum() for c in stripped_sentence): |
|
sentences.append(stripped_sentence) |
|
current_sentence = "" |
|
|
|
remain = current_sentence |
|
return sentences, remain |
|
|
|
def send_data( |
|
ten_env: AsyncTenEnv, |
|
sentence: str, |
|
stream_id: int, |
|
role: str, |
|
is_final: bool, |
|
): |
|
try: |
|
d = Data.create("text_data") |
|
d.set_property_string("text", sentence) |
|
d.set_property_bool("end_of_segment", is_final) |
|
d.set_property_string("role", role) |
|
d.set_property_int("stream_id", stream_id) |
|
ten_env.log_info( |
|
f"send transcript text [{sentence}] stream_id {stream_id} is_final {is_final} end_of_segment {is_final} role {role}" |
|
) |
|
asyncio.create_task(ten_env.send_data(d)) |
|
except Exception as e: |
|
ten_env.log_error( |
|
f"Error send text data {role}: {sentence} {is_final} {e}" |
|
) |
|
|
|
stream_id = self.remote_stream_id if role == Role.User else 0 |
|
try: |
|
if role == Role.Assistant and not is_final: |
|
sentences, self.transcript = parse_sentences(self.transcript, content) |
|
for s in sentences: |
|
asyncio.create_task( |
|
send_data(self.ten_env, s, stream_id, role, is_final) |
|
) |
|
else: |
|
asyncio.create_task( |
|
send_data(self.ten_env, content, stream_id, role, is_final) |
|
) |
|
except Exception as e: |
|
self.ten_env.log_error( |
|
f"Error send text data {role}: {content} {is_final} {e}" |
|
) |
|
|
|
def _dump_audio_if_need(self, buf: bytearray, role: Role) -> None: |
|
if not self.config.dump: |
|
return |
|
|
|
with open("{}_{}.pcm".format(role, self.channel_name), "ab") as dump_file: |
|
dump_file.write(buf) |
|
|
|
async def _handle_tool_call(self, func_calls: list[FunctionCall]) -> None: |
|
function_responses = [] |
|
for call in func_calls: |
|
tool_call_id = call.id |
|
name = call.name |
|
arguments = call.args |
|
self.ten_env.log_info( |
|
f"_handle_tool_call {tool_call_id} {name} {arguments}" |
|
) |
|
cmd: Cmd = Cmd.create(CMD_TOOL_CALL) |
|
cmd.set_property_string("name", name) |
|
cmd.set_property_from_json("arguments", json.dumps(arguments)) |
|
[result, _] = await self.ten_env.send_cmd(cmd) |
|
|
|
func_response = FunctionResponse( |
|
id=tool_call_id, name=name, response={"error": "Failed to call tool"} |
|
) |
|
if result.get_status_code() == StatusCode.OK: |
|
tool_result: LLMToolResult = json.loads( |
|
result.get_property_to_json(CMD_PROPERTY_RESULT) |
|
) |
|
|
|
result_content = tool_result["content"] |
|
func_response = FunctionResponse( |
|
id=tool_call_id, name=name, response={"output": result_content} |
|
) |
|
self.ten_env.log_info(f"tool_result: {tool_call_id} {tool_result}") |
|
else: |
|
self.ten_env.log_error("Tool call failed") |
|
function_responses.append(func_response) |
|
|
|
|
|
self.ten_env.log_info(f"_remote_tool_call finish {name} {arguments}") |
|
try: |
|
self.ten_env.log_info(f"send tool response {function_responses}") |
|
await self.session.send( |
|
LiveClientToolResponse(function_responses=function_responses) |
|
) |
|
except Exception as e: |
|
self.ten_env.log_error(f"Failed to send tool response {e}") |
|
|
|
def _greeting_text(self) -> str: |
|
text = "Hi, there." |
|
if self.config.language == "zh-CN": |
|
text = "你好。" |
|
elif self.config.language == "ja-JP": |
|
text = "こんにちは" |
|
elif self.config.language == "ko-KR": |
|
text = "안녕하세요" |
|
return text |
|
|
|
def _convert_tool_params_to_dict(self, tool: LLMToolMetadata): |
|
json_dict = {"type": "object", "properties": {}, "required": []} |
|
|
|
for param in tool.parameters: |
|
json_dict["properties"][param.name] = { |
|
"type": param.type, |
|
"description": param.description, |
|
} |
|
if param.required: |
|
json_dict["required"].append(param.name) |
|
|
|
return json_dict |
|
|
|
def _convert_to_content_parts( |
|
self, content: Iterable[LLMChatCompletionContentPartParam] |
|
): |
|
content_parts = [] |
|
|
|
if isinstance(content, str): |
|
content_parts.append({"type": "text", "text": content}) |
|
else: |
|
for part in content: |
|
|
|
if part["type"] == "text": |
|
content_parts.append(part) |
|
return content_parts |
|
|
|
async def _greeting(self) -> None: |
|
if self.connected and self.users_count == 1: |
|
text = self._greeting_text() |
|
if self.config.greeting: |
|
text = "Say '" + self.config.greeting + "' to me." |
|
self.ten_env.log_info(f"send greeting {text}") |
|
await self.session.send(text, end_of_turn=True) |
|
|
|
async def _flush(self) -> None: |
|
try: |
|
c = Cmd.create("flush") |
|
await self.ten_env.send_cmd(c) |
|
except Exception: |
|
self.ten_env.log_error("Error flush") |
|
|
|
async def _update_usage(self, usage: dict) -> None: |
|
self.total_usage.completion_tokens += usage.get("output_tokens") |
|
self.total_usage.prompt_tokens += usage.get("input_tokens") |
|
self.total_usage.total_tokens += usage.get("total_tokens") |
|
if not self.total_usage.completion_tokens_details: |
|
self.total_usage.completion_tokens_details = LLMCompletionTokensDetails() |
|
if not self.total_usage.prompt_tokens_details: |
|
self.total_usage.prompt_tokens_details = LLMPromptTokensDetails() |
|
|
|
if usage.get("output_token_details"): |
|
self.total_usage.completion_tokens_details.accepted_prediction_tokens += ( |
|
usage["output_token_details"].get("text_tokens") |
|
) |
|
self.total_usage.completion_tokens_details.audio_tokens += usage[ |
|
"output_token_details" |
|
].get("audio_tokens") |
|
|
|
if usage.get("input_token_details:"): |
|
self.total_usage.prompt_tokens_details.audio_tokens += usage[ |
|
"input_token_details" |
|
].get("audio_tokens") |
|
self.total_usage.prompt_tokens_details.cached_tokens += usage[ |
|
"input_token_details" |
|
].get("cached_tokens") |
|
self.total_usage.prompt_tokens_details.text_tokens += usage[ |
|
"input_token_details" |
|
].get("text_tokens") |
|
|
|
self.ten_env.log_info(f"total usage: {self.total_usage}") |
|
|
|
data = Data.create("llm_stat") |
|
data.set_property_from_json("usage", json.dumps(self.total_usage.model_dump())) |
|
if self.connect_times and self.completion_times and self.first_token_times: |
|
data.set_property_from_json( |
|
"latency", |
|
json.dumps( |
|
{ |
|
"connection_latency_95": np.percentile(self.connect_times, 95), |
|
"completion_latency_95": np.percentile( |
|
self.completion_times, 95 |
|
), |
|
"first_token_latency_95": np.percentile( |
|
self.first_token_times, 95 |
|
), |
|
"connection_latency_99": np.percentile(self.connect_times, 99), |
|
"completion_latency_99": np.percentile( |
|
self.completion_times, 99 |
|
), |
|
"first_token_latency_99": np.percentile( |
|
self.first_token_times, 99 |
|
), |
|
} |
|
), |
|
) |
|
asyncio.create_task(self.ten_env.send_data(data)) |
|
|
|
async def on_call_chat_completion(self, async_ten_env, **kargs): |
|
raise NotImplementedError |
|
|
|
async def on_data_chat_completion(self, async_ten_env, **kargs): |
|
raise NotImplementedError |
|
|