freddyaboulton's picture
Upload folder using huggingface_hub
1dd46a8 verified
raw
history blame
4.38 kB
import json
import logging
import os
from pathlib import Path
import anthropic
import gradio as gr
import numpy as np
from dotenv import load_dotenv
from elevenlabs import ElevenLabs
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, StreamingResponse
from fastrtc import AdditionalOutputs, ReplyOnPause, Stream, get_twilio_turn_credentials
from fastrtc.utils import aggregate_bytes_to_16bit, audio_to_bytes
from gradio.utils import get_space
from groq import Groq
from pydantic import BaseModel
# Configure the root logger to WARNING to suppress debug messages from other libraries
logging.basicConfig(level=logging.WARNING)
# Create a console handler
console_handler = logging.FileHandler("gradio_webrtc.log")
console_handler.setLevel(logging.DEBUG)
# Create a formatter
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
console_handler.setFormatter(formatter)
# Configure the logger for your specific library
logger = logging.getLogger("fastrtc")
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
load_dotenv()
groq_client = Groq()
claude_client = anthropic.Anthropic()
tts_client = ElevenLabs(api_key=os.environ["ELEVENLABS_API_KEY"])
curr_dir = Path(__file__).parent
def response(
audio: tuple[int, np.ndarray],
chatbot: list[dict] | None = None,
):
chatbot = chatbot or []
messages = [{"role": d["role"], "content": d["content"]} for d in chatbot]
prompt = groq_client.audio.transcriptions.create(
file=("audio-file.mp3", audio_to_bytes(audio)),
model="whisper-large-v3-turbo",
response_format="verbose_json",
).text
print("prompt", prompt)
chatbot.append({"role": "user", "content": prompt})
messages.append({"role": "user", "content": prompt})
response = claude_client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=512,
messages=messages, # type: ignore
)
response_text = " ".join(
block.text # type: ignore
for block in response.content
if getattr(block, "type", None) == "text"
)
chatbot.append({"role": "assistant", "content": response_text})
yield AdditionalOutputs(chatbot)
iterator = tts_client.text_to_speech.convert_as_stream(
text=response_text,
voice_id="JBFqnCBsd6RMkjVDRZzb",
model_id="eleven_multilingual_v2",
output_format="pcm_24000",
)
for chunk in aggregate_bytes_to_16bit(iterator):
audio_array = np.frombuffer(chunk, dtype=np.int16).reshape(1, -1)
yield (24000, audio_array, "mono")
chatbot = gr.Chatbot(type="messages")
stream = Stream(
modality="audio",
mode="send-receive",
handler=ReplyOnPause(response),
additional_outputs_handler=lambda a, b: b,
additional_inputs=[chatbot],
additional_outputs=[chatbot],
rtc_configuration=get_twilio_turn_credentials() if get_space() else None,
concurrency_limit=20 if get_space() else None,
)
class Message(BaseModel):
role: str
content: str
class InputData(BaseModel):
webrtc_id: str
chatbot: list[Message]
app = FastAPI()
stream.mount(app)
@app.get("/")
async def _():
rtc_config = get_twilio_turn_credentials() if get_space() else None
html_content = (curr_dir / "index.html").read_text()
html_content = html_content.replace("__RTC_CONFIGURATION__", json.dumps(rtc_config))
return HTMLResponse(content=html_content, status_code=200)
@app.post("/input_hook")
async def _(body: InputData):
stream.set_input(body.webrtc_id, body.model_dump()["chatbot"])
return {"status": "ok"}
@app.get("/outputs")
def _(webrtc_id: str):
print("outputs", webrtc_id)
async def output_stream():
async for output in stream.output_stream(webrtc_id):
chatbot = output.args[0]
yield f"event: output\ndata: {json.dumps(chatbot[-2])}\n\n"
yield f"event: output\ndata: {json.dumps(chatbot[-1])}\n\n"
return StreamingResponse(output_stream(), media_type="text/event-stream")
if __name__ == "__main__":
import os
if (mode := os.getenv("MODE")) == "UI":
stream.ui.launch(server_port=7860, server_name="0.0.0.0")
elif mode == "PHONE":
stream.fastphone(host="0.0.0.0", port=7860)
else:
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)