Spaces:
Paused
Paused
"""Advanced voice memory system for consistent voice generation.""" | |
import os | |
import torch | |
import torchaudio | |
import numpy as np | |
import random | |
import logging | |
from typing import Dict, List, Optional | |
from dataclasses import dataclass | |
from app.models import Segment | |
# Setup logging | |
logger = logging.getLogger(__name__) | |
# Path to store voice memories - use persistent location | |
VOICE_MEMORIES_DIR = "/app/voice_memories" | |
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
class VoiceMemory: | |
"""Store voice characteristics for consistent generation.""" | |
name: str # Voice name (alloy, echo, etc.) | |
speaker_id: int # Speaker ID (0-5) | |
# Store multiple audio segments for context | |
audio_segments: List[torch.Tensor] | |
# Store text prompts that produced good results | |
text_segments: List[str] | |
# Base characteristics for this voice | |
pitch_base: float # Base pitch characteristic (Hz) | |
timbre: str # Voice quality descriptor | |
def get_context_segments(self, device: torch.device, max_segments: int = 2) -> List[Segment]: | |
"""Get context segments for this voice.""" | |
if not self.audio_segments: | |
return [] | |
# Select a limited number of segments to avoid context overflow | |
num_segments = min(len(self.audio_segments), max_segments) | |
indices = list(range(len(self.audio_segments))) | |
random.shuffle(indices) | |
selected_indices = indices[:num_segments] | |
segments = [] | |
for i in selected_indices: | |
segments.append( | |
Segment( | |
speaker=self.speaker_id, | |
text=self.text_segments[i] if i < len(self.text_segments) else f"Voice sample {i}", | |
audio=self.audio_segments[i].to(device) | |
) | |
) | |
return segments | |
def update_with_new_audio(self, audio: torch.Tensor, text: str, max_stored: int = 5): | |
"""Update voice memory with newly generated audio.""" | |
# Add new audio and text | |
self.audio_segments.append(audio.detach().cpu()) | |
self.text_segments.append(text) | |
# Keep only the most recent segments | |
if len(self.audio_segments) > max_stored: | |
self.audio_segments = self.audio_segments[-max_stored:] | |
self.text_segments = self.text_segments[-max_stored:] | |
def save(self): | |
"""Save voice memory to persistent storage.""" | |
data = { | |
"name": self.name, | |
"speaker_id": self.speaker_id, | |
"audio_segments": self.audio_segments, | |
"text_segments": self.text_segments, | |
"pitch_base": self.pitch_base, | |
"timbre": self.timbre | |
} | |
# Save to the persistent directory | |
save_path = os.path.join(VOICE_MEMORIES_DIR, f"{self.name}.pt") | |
try: | |
torch.save(data, save_path) | |
logger.info(f"Saved voice memory for {self.name} to {save_path}") | |
except Exception as e: | |
logger.error(f"Error saving voice memory for {self.name}: {e}") | |
def load(cls, name: str) -> Optional['VoiceMemory']: | |
"""Load voice memory from persistent storage.""" | |
path = os.path.join(VOICE_MEMORIES_DIR, f"{name}.pt") | |
if not os.path.exists(path): | |
logger.info(f"No saved voice memory found for {name} at {path}") | |
return None | |
try: | |
data = torch.load(path) | |
return cls( | |
name=data["name"], | |
speaker_id=data["speaker_id"], | |
audio_segments=data["audio_segments"], | |
text_segments=data["text_segments"], | |
pitch_base=data["pitch_base"], | |
timbre=data["timbre"] | |
) | |
except Exception as e: | |
logger.error(f"Error loading voice memory for {name}: {e}") | |
return None | |
# Dictionary of voice memories | |
VOICE_MEMORIES: Dict[str, VoiceMemory] = {} | |
# Voice characteristics | |
VOICE_CHARACTERISTICS = { | |
"alloy": {"pitch": 220.0, "timbre": "balanced", "description": "A balanced, natural voice with medium pitch"}, | |
"echo": {"pitch": 330.0, "timbre": "resonant", "description": "A resonant voice with a reverberant quality"}, | |
"fable": {"pitch": 523.0, "timbre": "bright", "description": "A bright, higher-pitched voice with clear articulation"}, | |
"onyx": {"pitch": 165.0, "timbre": "deep", "description": "A deep, authoritative voice with lower pitch"}, | |
"nova": {"pitch": 392.0, "timbre": "warm", "description": "A warm, smooth voice with pleasant midrange tone"}, | |
"shimmer": {"pitch": 587.0, "timbre": "light", "description": "A light, airy voice with higher frequencies"} | |
} | |
# Voice intro texts - carefully crafted to capture voice characteristics | |
VOICE_INTROS = { | |
"alloy": [ | |
"Hello, I'm Alloy. My voice is designed to be clear and balanced.", | |
"This is the Alloy voice. I aim to sound natural and easy to understand.", | |
"Welcome, I'm the voice known as Alloy. I have a balanced, medium-range tone." | |
], | |
"echo": [ | |
"Hello, I'm Echo. My voice has a rich, resonant quality.", | |
"This is the Echo voice. Notice my distinctive resonance and depth.", | |
"Welcome, I'm the voice known as Echo. My tone is designed to resonate clearly." | |
], | |
"fable": [ | |
"Hello, I'm Fable. My voice is bright and articulate.", | |
"This is the Fable voice. I have a higher pitch with clear pronunciation.", | |
"Welcome, I'm the voice known as Fable. I speak with a bright, energetic tone." | |
], | |
"onyx": [ | |
"Hello, I'm Onyx. My voice is deep and authoritative.", | |
"This is the Onyx voice. I speak with a lower pitch and commanding presence.", | |
"Welcome, I'm the voice known as Onyx. My tone is deep and resonant." | |
], | |
"nova": [ | |
"Hello, I'm Nova. My voice is warm and harmonious.", | |
"This is the Nova voice. I have a smooth, pleasant mid-range quality.", | |
"Welcome, I'm the voice known as Nova. I speak with a warm, friendly tone." | |
], | |
"shimmer": [ | |
"Hello, I'm Shimmer. My voice is light and expressive.", | |
"This is the Shimmer voice. I have a higher-pitched, airy quality.", | |
"Welcome, I'm the voice known as Shimmer. My tone is bright and crisp." | |
] | |
} | |
def initialize_voices(sample_rate: int = 24000): | |
"""Initialize voice memories with consistent base samples.""" | |
global VOICE_MEMORIES | |
# Check if persistent directory exists, create if needed | |
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
logger.info(f"Using voice memories directory: {VOICE_MEMORIES_DIR}") | |
# First try to load existing memories from persistent storage | |
for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]: | |
memory = VoiceMemory.load(voice_name) | |
if memory: | |
VOICE_MEMORIES[voice_name] = memory | |
logger.info(f"Loaded existing voice memory for {voice_name} with {len(memory.audio_segments)} segments") | |
continue | |
# If no memory exists, create a new one | |
speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name) | |
characteristics = VOICE_CHARACTERISTICS[voice_name] | |
# Create deterministic seed audio | |
np.random.seed(speaker_id + 42) | |
duration = 1.0 # seconds | |
t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) | |
# Create characteristic waveform | |
pitch = characteristics["pitch"] | |
if voice_name == "alloy": | |
audio = 0.5 * np.sin(2 * np.pi * pitch * t) + 0.3 * np.sin(2 * np.pi * pitch * 2 * t) | |
elif voice_name == "echo": | |
audio = np.sin(2 * np.pi * pitch * t) * np.exp(-t * 3) | |
elif voice_name == "fable": | |
audio = 0.7 * np.sin(2 * np.pi * pitch * t) | |
elif voice_name == "onyx": | |
audio = 0.8 * np.sin(2 * np.pi * pitch * t) + 0.1 * np.sin(2 * np.pi * pitch * 0.5 * t) | |
elif voice_name == "nova": | |
audio = 0.4 * np.sin(2 * np.pi * pitch * t) + 0.4 * np.sin(2 * np.pi * pitch * 0.5 * t) | |
else: # shimmer | |
audio = 0.3 * np.sin(2 * np.pi * pitch * t) + 0.2 * np.sin(2 * np.pi * pitch * 1.5 * t) + 0.1 * np.sin(2 * np.pi * pitch * 2 * t) | |
# Normalize | |
audio = audio / np.max(np.abs(audio)) | |
# Convert to tensor | |
audio_tensor = torch.tensor(audio, dtype=torch.float32) | |
# Create voice memory | |
memory = VoiceMemory( | |
name=voice_name, | |
speaker_id=speaker_id, | |
audio_segments=[audio_tensor], | |
text_segments=[f"This is the voice of {voice_name}"], | |
pitch_base=characteristics["pitch"], | |
timbre=characteristics["timbre"] | |
) | |
# Save the voice memory to persistent storage | |
memory.save() | |
# Store in dictionary | |
VOICE_MEMORIES[voice_name] = memory | |
# Save as wav for reference | |
save_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_seed.wav") | |
torchaudio.save(save_path, audio_tensor.unsqueeze(0), sample_rate) | |
logger.info(f"Initialized new voice memory for {voice_name}") | |
def get_voice_context(voice_name: str, device: torch.device, max_segments: int = 2) -> List[Segment]: | |
"""Get context segments for a given voice.""" | |
if not VOICE_MEMORIES: | |
initialize_voices() | |
if voice_name in VOICE_MEMORIES: | |
return VOICE_MEMORIES[voice_name].get_context_segments(device, max_segments=max_segments) | |
# Default to alloy if voice not found | |
logger.warning(f"Voice {voice_name} not found, defaulting to alloy") | |
return VOICE_MEMORIES["alloy"].get_context_segments(device, max_segments=max_segments) | |
def update_voice_memory(voice_name: str, audio: torch.Tensor, text: str): | |
"""Update voice memory with newly generated audio and save to persistent storage.""" | |
if not VOICE_MEMORIES: | |
initialize_voices() | |
if voice_name in VOICE_MEMORIES: | |
VOICE_MEMORIES[voice_name].update_with_new_audio(audio, text) | |
VOICE_MEMORIES[voice_name].save() | |
logger.info(f"Updated voice memory for {voice_name}, now has {len(VOICE_MEMORIES[voice_name].audio_segments)} segments") | |
def generate_voice_samples(app_state): | |
"""Generate high-quality voice samples for each voice. | |
Args: | |
app_state: The FastAPI app state containing the generator | |
""" | |
generator = app_state.generator | |
if not generator: | |
logger.error("Cannot generate voice samples: generator not available") | |
return | |
logger.info("Beginning voice sample generation...") | |
# Ensure persistent directory exists | |
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]: | |
speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name) | |
# Get multiple sample texts for this voice | |
sample_texts = VOICE_INTROS[voice_name] | |
# Generate a collection of samples for this voice | |
logger.info(f"Generating samples for voice: {voice_name}") | |
audio_segments = [] | |
text_segments = [] | |
for i, sample_text in enumerate(sample_texts): | |
try: | |
# Check if we already have a sample | |
sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_sample_{i}.wav") | |
if os.path.exists(sample_path): | |
logger.info(f"Found existing sample {i+1} for {voice_name}, loading from {sample_path}") | |
audio_tensor, sr = torchaudio.load(sample_path) | |
if sr != generator.sample_rate: | |
audio_tensor = torchaudio.functional.resample( | |
audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate | |
) | |
else: | |
audio_tensor = audio_tensor.squeeze(0) | |
audio_segments.append(audio_tensor) | |
text_segments.append(sample_text) | |
continue | |
# Generate without context first for seed samples | |
logger.info(f"Generating sample {i+1}/{len(sample_texts)} for {voice_name}: '{sample_text}'") | |
# Use a lower temperature for more stable output | |
audio = generator.generate( | |
text=sample_text, | |
speaker=speaker_id, | |
context=[], # No context for initial samples | |
max_audio_length_ms=10000, | |
temperature=0.7, # Lower temperature for more stable output | |
topk=30, | |
) | |
# Save this segment | |
audio_segments.append(audio.detach().cpu()) | |
text_segments.append(sample_text) | |
# Save as WAV for reference to persistent storage | |
torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate) | |
logger.info(f"Generated sample {i+1} for {voice_name}, length: {audio.shape[0]/generator.sample_rate:.2f}s") | |
except Exception as e: | |
logger.error(f"Error generating sample {i+1} for {voice_name}: {e}") | |
# Use the generated samples to update the voice memory | |
if voice_name in VOICE_MEMORIES and audio_segments: | |
# Replace existing samples with these high quality ones | |
VOICE_MEMORIES[voice_name].audio_segments = audio_segments | |
VOICE_MEMORIES[voice_name].text_segments = text_segments | |
VOICE_MEMORIES[voice_name].save() | |
logger.info(f"Updated voice memory for {voice_name} with {len(audio_segments)} high-quality samples") | |
# Now generate a second pass with context from these samples | |
if len(audio_segments) >= 2: | |
try: | |
# Check if we already have a character sample | |
character_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_character.wav") | |
if os.path.exists(character_path): | |
logger.info(f"Found existing character sample for {voice_name}, loading from {character_path}") | |
audio_tensor, sr = torchaudio.load(character_path) | |
if sr != generator.sample_rate: | |
audio_tensor = torchaudio.functional.resample( | |
audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate | |
) | |
else: | |
audio_tensor = audio_tensor.squeeze(0) | |
character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize." | |
VOICE_MEMORIES[voice_name].audio_segments.append(audio_tensor) | |
VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text) | |
VOICE_MEMORIES[voice_name].save() | |
continue | |
# Get intro and conclusion prompts that build voice consistency | |
context = [ | |
Segment( | |
speaker=speaker_id, | |
text=text_segments[0], | |
audio=audio_segments[0].to(generator.device) | |
) | |
] | |
# Create a longer sample with the voice characteristics now established | |
character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize. My speech patterns and tone should remain consistent throughout our conversation." | |
logger.info(f"Generating character sample for {voice_name} with context") | |
character_audio = generator.generate( | |
text=character_sample_text, | |
speaker=speaker_id, | |
context=context, | |
max_audio_length_ms=15000, | |
temperature=0.7, | |
topk=30, | |
) | |
# Save this comprehensive character sample to persistent storage | |
torchaudio.save(character_path, character_audio.unsqueeze(0).cpu(), generator.sample_rate) | |
# Add this to the memory as well | |
VOICE_MEMORIES[voice_name].audio_segments.append(character_audio.detach().cpu()) | |
VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text) | |
VOICE_MEMORIES[voice_name].save() | |
logger.info(f"Generated character sample for {voice_name}, length: {character_audio.shape[0]/generator.sample_rate:.2f}s") | |
except Exception as e: | |
logger.error(f"Error generating character sample for {voice_name}: {e}") | |
def create_custom_voice( | |
app_state, | |
name: str, | |
initial_text: str, | |
speaker_id: int = 0, | |
pitch: Optional[float] = None, | |
timbre: str = "custom" | |
) -> Dict: | |
"""Create a new custom voice. | |
Args: | |
app_state: The FastAPI app state containing the generator | |
name: Name for the new voice | |
initial_text: Text for the initial voice sample | |
speaker_id: Base speaker ID (0-5) | |
pitch: Base pitch in Hz (optional) | |
timbre: Voice quality descriptor | |
Returns: | |
Dict with creation status and voice info | |
""" | |
generator = app_state.generator | |
if not generator: | |
return {"status": "error", "message": "Generator not available"} | |
# Check if voice already exists | |
if not VOICE_MEMORIES: | |
initialize_voices() | |
if name in VOICE_MEMORIES: | |
return {"status": "error", "message": f"Voice '{name}' already exists"} | |
# Generate a voice sample | |
try: | |
logger.info(f"Creating custom voice '{name}' with text: '{initial_text}'") | |
audio = generator.generate( | |
text=initial_text, | |
speaker=speaker_id, | |
context=[], | |
max_audio_length_ms=10000, | |
temperature=0.7, | |
) | |
# Determine base pitch if not provided | |
if pitch is None: | |
if speaker_id == 0: # alloy | |
pitch = 220.0 | |
elif speaker_id == 1: # echo | |
pitch = 330.0 | |
elif speaker_id == 2: # fable | |
pitch = 523.0 | |
elif speaker_id == 3: # onyx | |
pitch = 165.0 | |
elif speaker_id == 4: # nova | |
pitch = 392.0 | |
else: # shimmer | |
pitch = 587.0 | |
# Create a new voice memory | |
memory = VoiceMemory( | |
name=name, | |
speaker_id=speaker_id, | |
audio_segments=[audio.detach().cpu()], | |
text_segments=[initial_text], | |
pitch_base=pitch, | |
timbre=timbre | |
) | |
# Save the voice memory to persistent storage | |
memory.save() | |
VOICE_MEMORIES[name] = memory | |
# Save sample as WAV for reference to persistent storage | |
sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{name}_sample.wav") | |
torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate) | |
logger.info(f"Created custom voice '{name}' successfully") | |
return { | |
"status": "success", | |
"message": f"Voice '{name}' created successfully", | |
"voice": { | |
"name": name, | |
"speaker_id": speaker_id, | |
"pitch": pitch, | |
"timbre": timbre, | |
"sample_length_seconds": audio.shape[0] / generator.sample_rate | |
} | |
} | |
except Exception as e: | |
logger.error(f"Error creating custom voice '{name}': {e}") | |
return { | |
"status": "error", | |
"message": f"Error creating voice: {str(e)}" | |
} |