|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
from typing import Any, List, Optional, Union |
|
|
|
from openai import OpenAI, _legacy_response |
|
|
|
from camel.types import AudioModelType, VoiceType |
|
|
|
|
|
class OpenAIAudioModels: |
|
r"""Provides access to OpenAI's Text-to-Speech (TTS) and Speech_to_Text |
|
(STT) models.""" |
|
|
|
def __init__( |
|
self, |
|
api_key: Optional[str] = None, |
|
url: Optional[str] = None, |
|
) -> None: |
|
r"""Initialize an instance of OpenAI.""" |
|
self._url = url or os.environ.get("OPENAI_API_BASE_URL") |
|
self._api_key = api_key or os.environ.get("OPENAI_API_KEY") |
|
self._client = OpenAI( |
|
timeout=120, |
|
max_retries=3, |
|
base_url=self._url, |
|
api_key=self._api_key, |
|
) |
|
|
|
def text_to_speech( |
|
self, |
|
input: str, |
|
model_type: AudioModelType = AudioModelType.TTS_1, |
|
voice: VoiceType = VoiceType.ALLOY, |
|
storage_path: Optional[str] = None, |
|
**kwargs: Any, |
|
) -> Union[ |
|
List[_legacy_response.HttpxBinaryResponseContent], |
|
_legacy_response.HttpxBinaryResponseContent, |
|
]: |
|
r"""Convert text to speech using OpenAI's TTS model. This method |
|
converts the given input text to speech using the specified model and |
|
voice. |
|
|
|
Args: |
|
input (str): The text to be converted to speech. |
|
model_type (AudioModelType, optional): The TTS model to use. |
|
Defaults to `AudioModelType.TTS_1`. |
|
voice (VoiceType, optional): The voice to be used for generating |
|
speech. Defaults to `VoiceType.ALLOY`. |
|
storage_path (str, optional): The local path to store the |
|
generated speech file if provided, defaults to `None`. |
|
**kwargs (Any): Extra kwargs passed to the TTS API. |
|
|
|
Returns: |
|
Union[List[_legacy_response.HttpxBinaryResponseContent], |
|
_legacy_response.HttpxBinaryResponseContent]: List of response |
|
content object from OpenAI if input charaters more than 4096, |
|
single response content if input charaters less than 4096. |
|
|
|
Raises: |
|
Exception: If there's an error during the TTS API call. |
|
""" |
|
try: |
|
|
|
max_chunk_size = 4095 |
|
audio_chunks = [] |
|
chunk_index = 0 |
|
if len(input) > max_chunk_size: |
|
while input: |
|
if len(input) <= max_chunk_size: |
|
chunk = input |
|
input = '' |
|
else: |
|
|
|
while input[max_chunk_size - 1] != '.': |
|
max_chunk_size -= 1 |
|
|
|
chunk = input[:max_chunk_size] |
|
input = input[max_chunk_size:].lstrip() |
|
|
|
response = self._client.audio.speech.create( |
|
model=model_type.value, |
|
voice=voice.value, |
|
input=chunk, |
|
**kwargs, |
|
) |
|
if storage_path: |
|
try: |
|
|
|
file_name, file_extension = os.path.splitext( |
|
storage_path |
|
) |
|
new_storage_path = ( |
|
f"{file_name}_{chunk_index}{file_extension}" |
|
) |
|
response.write_to_file(new_storage_path) |
|
chunk_index += 1 |
|
except Exception as e: |
|
raise Exception( |
|
"Error during writing the file" |
|
) from e |
|
|
|
audio_chunks.append(response) |
|
return audio_chunks |
|
|
|
else: |
|
response = self._client.audio.speech.create( |
|
model=model_type.value, |
|
voice=voice.value, |
|
input=input, |
|
**kwargs, |
|
) |
|
|
|
if storage_path: |
|
try: |
|
response.write_to_file(storage_path) |
|
except Exception as e: |
|
raise Exception("Error during write the file") from e |
|
|
|
return response |
|
|
|
except Exception as e: |
|
raise Exception("Error during TTS API call") from e |
|
|
|
def _split_audio( |
|
self, audio_file_path: str, chunk_size_mb: int = 24 |
|
) -> list: |
|
r"""Split the audio file into smaller chunks. Since the Whisper API |
|
only supports files that are less than 25 MB. |
|
|
|
Args: |
|
audio_file_path (str): Path to the input audio file. |
|
chunk_size_mb (int, optional): Size of each chunk in megabytes. |
|
Defaults to `24`. |
|
|
|
Returns: |
|
list: List of paths to the split audio files. |
|
""" |
|
from pydub import AudioSegment |
|
|
|
audio = AudioSegment.from_file(audio_file_path) |
|
audio_format = os.path.splitext(audio_file_path)[1][1:].lower() |
|
|
|
|
|
chunk_size_bytes = chunk_size_mb * 1024 * 1024 |
|
|
|
|
|
num_chunks = os.path.getsize(audio_file_path) // chunk_size_bytes + 1 |
|
|
|
|
|
output_dir = os.path.splitext(audio_file_path)[0] + "_chunks" |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
chunk_size_milliseconds = len(audio) // (num_chunks) |
|
|
|
|
|
split_files = [] |
|
for i in range(num_chunks): |
|
start = i * chunk_size_milliseconds |
|
end = (i + 1) * chunk_size_milliseconds |
|
if i + 1 == num_chunks: |
|
chunk = audio[start:] |
|
else: |
|
chunk = audio[start:end] |
|
|
|
chunk_path = os.path.join(output_dir, f"chunk_{i}.{audio_format}") |
|
chunk.export(chunk_path, format=audio_format) |
|
split_files.append(chunk_path) |
|
return split_files |
|
|
|
def speech_to_text( |
|
self, |
|
audio_file_path: str, |
|
translate_into_english: bool = False, |
|
**kwargs: Any, |
|
) -> str: |
|
r"""Convert speech audio to text. |
|
|
|
Args: |
|
audio_file_path (str): The audio file path, supporting one of |
|
these formats: flac, mp3, mp4, mpeg, mpga, m4a, ogg, wav, or |
|
webm. |
|
translate_into_english (bool, optional): Whether to translate the |
|
speech into English. Defaults to `False`. |
|
**kwargs (Any): Extra keyword arguments passed to the |
|
Speech-to-Text (STT) API. |
|
|
|
Returns: |
|
str: The output text. |
|
|
|
Raises: |
|
ValueError: If the audio file format is not supported. |
|
Exception: If there's an error during the STT API call. |
|
""" |
|
supported_formats = [ |
|
"flac", |
|
"mp3", |
|
"mp4", |
|
"mpeg", |
|
"mpga", |
|
"m4a", |
|
"ogg", |
|
"wav", |
|
"webm", |
|
] |
|
file_format = audio_file_path.split(".")[-1].lower() |
|
|
|
if file_format not in supported_formats: |
|
raise ValueError(f"Unsupported audio file format: {file_format}") |
|
try: |
|
if os.path.getsize(audio_file_path) > 24 * 1024 * 1024: |
|
|
|
audio_chunks = self._split_audio(audio_file_path) |
|
texts = [] |
|
for chunk_path in audio_chunks: |
|
audio_data = open(chunk_path, "rb") |
|
if translate_into_english: |
|
translation = self._client.audio.translations.create( |
|
model="whisper-1", file=audio_data, **kwargs |
|
) |
|
texts.append(translation.text) |
|
else: |
|
transcription = ( |
|
self._client.audio.transcriptions.create( |
|
model="whisper-1", file=audio_data, **kwargs |
|
) |
|
) |
|
texts.append(transcription.text) |
|
os.remove(chunk_path) |
|
return " ".join(texts) |
|
else: |
|
|
|
audio_data = open(audio_file_path, "rb") |
|
|
|
if translate_into_english: |
|
translation = self._client.audio.translations.create( |
|
model="whisper-1", file=audio_data, **kwargs |
|
) |
|
return translation.text |
|
else: |
|
transcription = self._client.audio.transcriptions.create( |
|
model="whisper-1", file=audio_data, **kwargs |
|
) |
|
return transcription.text |
|
except Exception as e: |
|
raise Exception("Error during STT API call") from e |
|
|