documentaitestv2 / realtime_transcriber.py
IAMTFRMZA's picture
Create realtime_transcriber.py
33aa59b verified
raw
history blame
3.76 kB
import asyncio
from websockets import connect, Data, ClientConnection
import json
import numpy as np
import base64
import soundfile as sf
import io
from pydub import AudioSegment
import os
# Load OpenAI API key
from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY must be set in environment")
WEBSOCKET_URI = "wss://api.openai.com/v1/realtime?intent=transcription"
WEBSOCKET_HEADERS = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"OpenAI-Beta": "realtime=v1"
}
# Shared client registry
connections = {}
class WebSocketClient:
def __init__(self, uri: str, headers: dict, client_id: str):
self.uri = uri
self.headers = headers
self.websocket: ClientConnection = None
self.queue = asyncio.Queue(maxsize=10)
self.loop = None
self.client_id = client_id
self.transcript = ""
async def connect(self):
try:
self.websocket = await connect(self.uri, additional_headers=self.headers)
print(f"βœ… Connected to OpenAI WebSocket")
# Send transcription session settings
with open("openai_transcription_settings.json", "r") as f:
settings = f.read()
await self.websocket.send(settings)
await asyncio.gather(self.receive_messages(), self.send_audio_chunks())
except Exception as e:
print(f"❌ WebSocket Error: {e}")
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.connect())
def process_websocket_message(self, message: Data):
try:
message_object = json.loads(message)
if message_object["type"] == "conversation.item.input_audio_transcription.delta":
delta = message_object["delta"]
self.transcript += delta
elif message_object["type"] == "conversation.item.input_audio_transcription.completed":
self.transcript += ' ' if self.transcript and self.transcript[-1] != ' ' else ''
except Exception as e:
print(f"⚠️ Error processing message: {e}")
async def send_audio_chunks(self):
while True:
sample_rate, audio_array = await self.queue.get()
if self.websocket:
if audio_array.ndim > 1:
audio_array = audio_array.mean(axis=1)
audio_array = audio_array.astype(np.float32)
audio_array /= np.max(np.abs(audio_array)) if np.max(np.abs(audio_array)) > 0 else 1.0
int_audio = (audio_array * 32767).astype(np.int16)
buffer = io.BytesIO()
sf.write(buffer, int_audio, sample_rate, format="WAV", subtype="PCM_16")
buffer.seek(0)
audio_segment = AudioSegment.from_file(buffer, format="wav")
resampled = audio_segment.set_frame_rate(24000)
out_buf = io.BytesIO()
resampled.export(out_buf, format="wav")
out_buf.seek(0)
b64_audio = base64.b64encode(out_buf.read()).decode("utf-8")
await self.websocket.send(json.dumps({
"type": "input_audio_buffer.append",
"audio": b64_audio
}))
async def receive_messages(self):
async for message in self.websocket:
self.process_websocket_message(message)
def enqueue_audio_chunk(self, sample_rate: int, chunk_array: np.ndarray):
if not self.queue.full():
asyncio.run_coroutine_threadsafe(self.queue.put((sample_rate, chunk_array)), self.loop)