sesame_openai / app /voice_memory.py
karumati's picture
yo
01115c6
"""Advanced voice memory system for consistent voice generation."""
import os
import torch
import torchaudio
import numpy as np
import random
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from app.models import Segment
# Setup logging
logger = logging.getLogger(__name__)
# Path to store voice memories - use persistent location
VOICE_MEMORIES_DIR = "/app/voice_memories"
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True)
@dataclass
class VoiceMemory:
"""Store voice characteristics for consistent generation."""
name: str # Voice name (alloy, echo, etc.)
speaker_id: int # Speaker ID (0-5)
# Store multiple audio segments for context
audio_segments: List[torch.Tensor]
# Store text prompts that produced good results
text_segments: List[str]
# Base characteristics for this voice
pitch_base: float # Base pitch characteristic (Hz)
timbre: str # Voice quality descriptor
def get_context_segments(self, device: torch.device, max_segments: int = 2) -> List[Segment]:
"""Get context segments for this voice."""
if not self.audio_segments:
return []
# Select a limited number of segments to avoid context overflow
num_segments = min(len(self.audio_segments), max_segments)
indices = list(range(len(self.audio_segments)))
random.shuffle(indices)
selected_indices = indices[:num_segments]
segments = []
for i in selected_indices:
segments.append(
Segment(
speaker=self.speaker_id,
text=self.text_segments[i] if i < len(self.text_segments) else f"Voice sample {i}",
audio=self.audio_segments[i].to(device)
)
)
return segments
def update_with_new_audio(self, audio: torch.Tensor, text: str, max_stored: int = 5):
"""Update voice memory with newly generated audio."""
# Add new audio and text
self.audio_segments.append(audio.detach().cpu())
self.text_segments.append(text)
# Keep only the most recent segments
if len(self.audio_segments) > max_stored:
self.audio_segments = self.audio_segments[-max_stored:]
self.text_segments = self.text_segments[-max_stored:]
def save(self):
"""Save voice memory to persistent storage."""
data = {
"name": self.name,
"speaker_id": self.speaker_id,
"audio_segments": self.audio_segments,
"text_segments": self.text_segments,
"pitch_base": self.pitch_base,
"timbre": self.timbre
}
# Save to the persistent directory
save_path = os.path.join(VOICE_MEMORIES_DIR, f"{self.name}.pt")
try:
torch.save(data, save_path)
logger.info(f"Saved voice memory for {self.name} to {save_path}")
except Exception as e:
logger.error(f"Error saving voice memory for {self.name}: {e}")
@classmethod
def load(cls, name: str) -> Optional['VoiceMemory']:
"""Load voice memory from persistent storage."""
path = os.path.join(VOICE_MEMORIES_DIR, f"{name}.pt")
if not os.path.exists(path):
logger.info(f"No saved voice memory found for {name} at {path}")
return None
try:
data = torch.load(path)
return cls(
name=data["name"],
speaker_id=data["speaker_id"],
audio_segments=data["audio_segments"],
text_segments=data["text_segments"],
pitch_base=data["pitch_base"],
timbre=data["timbre"]
)
except Exception as e:
logger.error(f"Error loading voice memory for {name}: {e}")
return None
# Dictionary of voice memories
VOICE_MEMORIES: Dict[str, VoiceMemory] = {}
# Voice characteristics
VOICE_CHARACTERISTICS = {
"alloy": {"pitch": 220.0, "timbre": "balanced", "description": "A balanced, natural voice with medium pitch"},
"echo": {"pitch": 330.0, "timbre": "resonant", "description": "A resonant voice with a reverberant quality"},
"fable": {"pitch": 523.0, "timbre": "bright", "description": "A bright, higher-pitched voice with clear articulation"},
"onyx": {"pitch": 165.0, "timbre": "deep", "description": "A deep, authoritative voice with lower pitch"},
"nova": {"pitch": 392.0, "timbre": "warm", "description": "A warm, smooth voice with pleasant midrange tone"},
"shimmer": {"pitch": 587.0, "timbre": "light", "description": "A light, airy voice with higher frequencies"}
}
# Voice intro texts - carefully crafted to capture voice characteristics
VOICE_INTROS = {
"alloy": [
"Hello, I'm Alloy. My voice is designed to be clear and balanced.",
"This is the Alloy voice. I aim to sound natural and easy to understand.",
"Welcome, I'm the voice known as Alloy. I have a balanced, medium-range tone."
],
"echo": [
"Hello, I'm Echo. My voice has a rich, resonant quality.",
"This is the Echo voice. Notice my distinctive resonance and depth.",
"Welcome, I'm the voice known as Echo. My tone is designed to resonate clearly."
],
"fable": [
"Hello, I'm Fable. My voice is bright and articulate.",
"This is the Fable voice. I have a higher pitch with clear pronunciation.",
"Welcome, I'm the voice known as Fable. I speak with a bright, energetic tone."
],
"onyx": [
"Hello, I'm Onyx. My voice is deep and authoritative.",
"This is the Onyx voice. I speak with a lower pitch and commanding presence.",
"Welcome, I'm the voice known as Onyx. My tone is deep and resonant."
],
"nova": [
"Hello, I'm Nova. My voice is warm and harmonious.",
"This is the Nova voice. I have a smooth, pleasant mid-range quality.",
"Welcome, I'm the voice known as Nova. I speak with a warm, friendly tone."
],
"shimmer": [
"Hello, I'm Shimmer. My voice is light and expressive.",
"This is the Shimmer voice. I have a higher-pitched, airy quality.",
"Welcome, I'm the voice known as Shimmer. My tone is bright and crisp."
]
}
def initialize_voices(sample_rate: int = 24000):
"""Initialize voice memories with consistent base samples."""
global VOICE_MEMORIES
# Check if persistent directory exists, create if needed
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True)
logger.info(f"Using voice memories directory: {VOICE_MEMORIES_DIR}")
# First try to load existing memories from persistent storage
for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]:
memory = VoiceMemory.load(voice_name)
if memory:
VOICE_MEMORIES[voice_name] = memory
logger.info(f"Loaded existing voice memory for {voice_name} with {len(memory.audio_segments)} segments")
continue
# If no memory exists, create a new one
speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name)
characteristics = VOICE_CHARACTERISTICS[voice_name]
# Create deterministic seed audio
np.random.seed(speaker_id + 42)
duration = 1.0 # seconds
t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False)
# Create characteristic waveform
pitch = characteristics["pitch"]
if voice_name == "alloy":
audio = 0.5 * np.sin(2 * np.pi * pitch * t) + 0.3 * np.sin(2 * np.pi * pitch * 2 * t)
elif voice_name == "echo":
audio = np.sin(2 * np.pi * pitch * t) * np.exp(-t * 3)
elif voice_name == "fable":
audio = 0.7 * np.sin(2 * np.pi * pitch * t)
elif voice_name == "onyx":
audio = 0.8 * np.sin(2 * np.pi * pitch * t) + 0.1 * np.sin(2 * np.pi * pitch * 0.5 * t)
elif voice_name == "nova":
audio = 0.4 * np.sin(2 * np.pi * pitch * t) + 0.4 * np.sin(2 * np.pi * pitch * 0.5 * t)
else: # shimmer
audio = 0.3 * np.sin(2 * np.pi * pitch * t) + 0.2 * np.sin(2 * np.pi * pitch * 1.5 * t) + 0.1 * np.sin(2 * np.pi * pitch * 2 * t)
# Normalize
audio = audio / np.max(np.abs(audio))
# Convert to tensor
audio_tensor = torch.tensor(audio, dtype=torch.float32)
# Create voice memory
memory = VoiceMemory(
name=voice_name,
speaker_id=speaker_id,
audio_segments=[audio_tensor],
text_segments=[f"This is the voice of {voice_name}"],
pitch_base=characteristics["pitch"],
timbre=characteristics["timbre"]
)
# Save the voice memory to persistent storage
memory.save()
# Store in dictionary
VOICE_MEMORIES[voice_name] = memory
# Save as wav for reference
save_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_seed.wav")
torchaudio.save(save_path, audio_tensor.unsqueeze(0), sample_rate)
logger.info(f"Initialized new voice memory for {voice_name}")
def get_voice_context(voice_name: str, device: torch.device, max_segments: int = 2) -> List[Segment]:
"""Get context segments for a given voice."""
if not VOICE_MEMORIES:
initialize_voices()
if voice_name in VOICE_MEMORIES:
return VOICE_MEMORIES[voice_name].get_context_segments(device, max_segments=max_segments)
# Default to alloy if voice not found
logger.warning(f"Voice {voice_name} not found, defaulting to alloy")
return VOICE_MEMORIES["alloy"].get_context_segments(device, max_segments=max_segments)
def update_voice_memory(voice_name: str, audio: torch.Tensor, text: str):
"""Update voice memory with newly generated audio and save to persistent storage."""
if not VOICE_MEMORIES:
initialize_voices()
if voice_name in VOICE_MEMORIES:
VOICE_MEMORIES[voice_name].update_with_new_audio(audio, text)
VOICE_MEMORIES[voice_name].save()
logger.info(f"Updated voice memory for {voice_name}, now has {len(VOICE_MEMORIES[voice_name].audio_segments)} segments")
def generate_voice_samples(app_state):
"""Generate high-quality voice samples for each voice.
Args:
app_state: The FastAPI app state containing the generator
"""
generator = app_state.generator
if not generator:
logger.error("Cannot generate voice samples: generator not available")
return
logger.info("Beginning voice sample generation...")
# Ensure persistent directory exists
os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True)
for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]:
speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name)
# Get multiple sample texts for this voice
sample_texts = VOICE_INTROS[voice_name]
# Generate a collection of samples for this voice
logger.info(f"Generating samples for voice: {voice_name}")
audio_segments = []
text_segments = []
for i, sample_text in enumerate(sample_texts):
try:
# Check if we already have a sample
sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_sample_{i}.wav")
if os.path.exists(sample_path):
logger.info(f"Found existing sample {i+1} for {voice_name}, loading from {sample_path}")
audio_tensor, sr = torchaudio.load(sample_path)
if sr != generator.sample_rate:
audio_tensor = torchaudio.functional.resample(
audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate
)
else:
audio_tensor = audio_tensor.squeeze(0)
audio_segments.append(audio_tensor)
text_segments.append(sample_text)
continue
# Generate without context first for seed samples
logger.info(f"Generating sample {i+1}/{len(sample_texts)} for {voice_name}: '{sample_text}'")
# Use a lower temperature for more stable output
audio = generator.generate(
text=sample_text,
speaker=speaker_id,
context=[], # No context for initial samples
max_audio_length_ms=10000,
temperature=0.7, # Lower temperature for more stable output
topk=30,
)
# Save this segment
audio_segments.append(audio.detach().cpu())
text_segments.append(sample_text)
# Save as WAV for reference to persistent storage
torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate)
logger.info(f"Generated sample {i+1} for {voice_name}, length: {audio.shape[0]/generator.sample_rate:.2f}s")
except Exception as e:
logger.error(f"Error generating sample {i+1} for {voice_name}: {e}")
# Use the generated samples to update the voice memory
if voice_name in VOICE_MEMORIES and audio_segments:
# Replace existing samples with these high quality ones
VOICE_MEMORIES[voice_name].audio_segments = audio_segments
VOICE_MEMORIES[voice_name].text_segments = text_segments
VOICE_MEMORIES[voice_name].save()
logger.info(f"Updated voice memory for {voice_name} with {len(audio_segments)} high-quality samples")
# Now generate a second pass with context from these samples
if len(audio_segments) >= 2:
try:
# Check if we already have a character sample
character_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_character.wav")
if os.path.exists(character_path):
logger.info(f"Found existing character sample for {voice_name}, loading from {character_path}")
audio_tensor, sr = torchaudio.load(character_path)
if sr != generator.sample_rate:
audio_tensor = torchaudio.functional.resample(
audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate
)
else:
audio_tensor = audio_tensor.squeeze(0)
character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize."
VOICE_MEMORIES[voice_name].audio_segments.append(audio_tensor)
VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text)
VOICE_MEMORIES[voice_name].save()
continue
# Get intro and conclusion prompts that build voice consistency
context = [
Segment(
speaker=speaker_id,
text=text_segments[0],
audio=audio_segments[0].to(generator.device)
)
]
# Create a longer sample with the voice characteristics now established
character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize. My speech patterns and tone should remain consistent throughout our conversation."
logger.info(f"Generating character sample for {voice_name} with context")
character_audio = generator.generate(
text=character_sample_text,
speaker=speaker_id,
context=context,
max_audio_length_ms=15000,
temperature=0.7,
topk=30,
)
# Save this comprehensive character sample to persistent storage
torchaudio.save(character_path, character_audio.unsqueeze(0).cpu(), generator.sample_rate)
# Add this to the memory as well
VOICE_MEMORIES[voice_name].audio_segments.append(character_audio.detach().cpu())
VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text)
VOICE_MEMORIES[voice_name].save()
logger.info(f"Generated character sample for {voice_name}, length: {character_audio.shape[0]/generator.sample_rate:.2f}s")
except Exception as e:
logger.error(f"Error generating character sample for {voice_name}: {e}")
def create_custom_voice(
app_state,
name: str,
initial_text: str,
speaker_id: int = 0,
pitch: Optional[float] = None,
timbre: str = "custom"
) -> Dict:
"""Create a new custom voice.
Args:
app_state: The FastAPI app state containing the generator
name: Name for the new voice
initial_text: Text for the initial voice sample
speaker_id: Base speaker ID (0-5)
pitch: Base pitch in Hz (optional)
timbre: Voice quality descriptor
Returns:
Dict with creation status and voice info
"""
generator = app_state.generator
if not generator:
return {"status": "error", "message": "Generator not available"}
# Check if voice already exists
if not VOICE_MEMORIES:
initialize_voices()
if name in VOICE_MEMORIES:
return {"status": "error", "message": f"Voice '{name}' already exists"}
# Generate a voice sample
try:
logger.info(f"Creating custom voice '{name}' with text: '{initial_text}'")
audio = generator.generate(
text=initial_text,
speaker=speaker_id,
context=[],
max_audio_length_ms=10000,
temperature=0.7,
)
# Determine base pitch if not provided
if pitch is None:
if speaker_id == 0: # alloy
pitch = 220.0
elif speaker_id == 1: # echo
pitch = 330.0
elif speaker_id == 2: # fable
pitch = 523.0
elif speaker_id == 3: # onyx
pitch = 165.0
elif speaker_id == 4: # nova
pitch = 392.0
else: # shimmer
pitch = 587.0
# Create a new voice memory
memory = VoiceMemory(
name=name,
speaker_id=speaker_id,
audio_segments=[audio.detach().cpu()],
text_segments=[initial_text],
pitch_base=pitch,
timbre=timbre
)
# Save the voice memory to persistent storage
memory.save()
VOICE_MEMORIES[name] = memory
# Save sample as WAV for reference to persistent storage
sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{name}_sample.wav")
torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate)
logger.info(f"Created custom voice '{name}' successfully")
return {
"status": "success",
"message": f"Voice '{name}' created successfully",
"voice": {
"name": name,
"speaker_id": speaker_id,
"pitch": pitch,
"timbre": timbre,
"sample_length_seconds": audio.shape[0] / generator.sample_rate
}
}
except Exception as e:
logger.error(f"Error creating custom voice '{name}': {e}")
return {
"status": "error",
"message": f"Error creating voice: {str(e)}"
}