Spaces:
Sleeping
Sleeping
import dash | |
from dash import dcc, html, Input, Output, State, callback | |
import dash_bootstrap_components as dbc | |
import base64 | |
import io | |
import os | |
from snac import SNAC | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
import google.generativeai as genai | |
import re | |
import logging | |
import numpy as np | |
from pydub import AudioSegment | |
from docx import Document | |
import PyPDF2 | |
from tqdm import tqdm | |
import soundfile as sf | |
# Initialize logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Initialize device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Load models | |
print("Loading SNAC model...") | |
snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz") | |
snac_model = snac_model.to(device) | |
model_name = "canopylabs/orpheus-3b-0.1-ft" | |
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16) | |
model.to(device) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
print(f"Orpheus model loaded to {device}") | |
# Available voices and emotive tags | |
VOICES = ["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"] | |
EMOTIVE_TAGS = ["<laugh>", "<chuckle>", "<sigh>", "<cough>", "<sniffle>", "<groan>", "<yawn>", "<gasp>"] | |
# Initialize Dash app | |
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) | |
app.layout = dbc.Container([ | |
dbc.Row([ | |
dbc.Col([ | |
html.H1("Orpheus Text-to-Speech", className="text-center mb-4"), | |
], width=12), | |
]), | |
dbc.Row([ | |
dbc.Col([ | |
dbc.Input(id="host1-name", placeholder="Enter name of first host", className="mb-2"), | |
dbc.Input(id="host2-name", placeholder="Enter name of second host", className="mb-2"), | |
dbc.Input(id="podcast-name", placeholder="Enter podcast name", className="mb-2"), | |
dbc.Input(id="podcast-topic", placeholder="Enter podcast topic", className="mb-2"), | |
dbc.Textarea(id="prompt", placeholder="Enter your text here...", rows=5, className="mb-2"), | |
dcc.Upload( | |
id='upload-file', | |
children=html.Div(['Drag and Drop or ', html.A('Select a File')]), | |
style={ | |
'width': '100%', | |
'height': '60px', | |
'lineHeight': '60px', | |
'borderWidth': '1px', | |
'borderStyle': 'dashed', | |
'borderRadius': '5px', | |
'textAlign': 'center', | |
'margin': '10px 0' | |
}, | |
), | |
html.Label("Duration (minutes)", className="mt-2"), | |
dcc.Slider(id="duration", min=1, max=60, value=5, step=1, marks={1: '1', 30: '30', 60: '60'}, className="mb-2"), | |
html.Label("Number of Hosts", className="mt-2"), | |
dbc.RadioItems( | |
id="num-hosts", | |
options=[{"label": i, "value": i} for i in ["1", "2"]], | |
value="1", | |
inline=True, | |
className="mb-2" | |
), | |
dbc.Button("Generate Podcast Script", id="generate-script-btn", color="primary", className="mb-2"), | |
dbc.Spinner(html.Div(id="script-loading"), color="primary"), | |
], width=6), | |
dbc.Col([ | |
dbc.Textarea(id="script-output", placeholder="Generated script will appear here...", rows=10, className="mb-2"), | |
dbc.Button("Clear", id="clear-btn", color="secondary", className="mb-2 d-block"), | |
html.Label("Voice 1", className="mt-3"), | |
dcc.Dropdown(id="voice1", options=[{"label": v, "value": v} for v in VOICES], value="tara", className="mb-2"), | |
html.Label("Voice 2", className="mt-2"), | |
dcc.Dropdown(id="voice2", options=[{"label": v, "value": v} for v in VOICES], value="zac", className="mb-2"), | |
dbc.Button("Generate Audio", id="generate-audio-btn", color="success", className="mb-2"), | |
dbc.Spinner(html.Div(id="audio-loading"), color="primary"), | |
html.Div(id="audio-output"), | |
dbc.Button("Advanced Settings", id="advanced-settings-toggle", color="info", className="mb-2"), | |
dbc.Collapse([ | |
html.Label("Temperature", className="mt-2"), | |
dcc.Slider(id="temperature", min=0.1, max=1.5, value=0.6, step=0.05, marks={0.1: '0.1', 0.8: '0.8', 1.5: '1.5'}, className="mb-2"), | |
html.Label("Top P", className="mt-2"), | |
dcc.Slider(id="top-p", min=0.1, max=1.0, value=0.9, step=0.05, marks={0.1: '0.1', 0.5: '0.5', 1.0: '1.0'}, className="mb-2"), | |
html.Label("Repetition Penalty", className="mt-2"), | |
dcc.Slider(id="repetition-penalty", min=1.0, max=2.0, value=1.2, step=0.1, marks={1.0: '1.0', 1.5: '1.5', 2.0: '2.0'}, className="mb-2"), | |
html.Label("Max New Tokens", className="mt-2"), | |
dcc.Slider(id="max-new-tokens", min=100, max=16384, value=4096, step=100, marks={100: '100', 8192: '8192', 16384: '16384'}, className="mb-2"), | |
], id="advanced-settings", is_open=False), | |
], width=6), | |
]), | |
dcc.Store(id='generated-script'), | |
dcc.Store(id='generated-audio'), | |
]) | |
def process_prompt(prompt, voice, tokenizer, device): | |
prompt = f"{voice}: {prompt}" | |
input_ids = tokenizer(prompt, return_tensors="pt").input_ids | |
start_token = torch.tensor([[128259]], dtype=torch.int64) | |
end_tokens = torch.tensor([[128009, 128260]], dtype=torch.int64) | |
modified_input_ids = torch.cat([start_token, input_ids, end_tokens], dim=1) | |
attention_mask = torch.ones_like(modified_input_ids) | |
return modified_input_ids.to(device), attention_mask.to(device) | |
def parse_output(generated_ids): | |
token_to_find = 128257 | |
token_to_remove = 128258 | |
token_indices = (generated_ids == token_to_find).nonzero(as_tuple=True) | |
if len(token_indices[1]) > 0: | |
last_occurrence_idx = token_indices[1][-1].item() | |
cropped_tensor = generated_ids[:, last_occurrence_idx+1:] | |
else: | |
cropped_tensor = generated_ids | |
processed_rows = [] | |
for row in cropped_tensor: | |
masked_row = row[row != token_to_remove] | |
processed_rows.append(masked_row) | |
code_lists = [] | |
for row in processed_rows: | |
row_length = row.size(0) | |
new_length = (row_length // 7) * 7 | |
trimmed_row = row[:new_length] | |
trimmed_row = [t - 128266 for t in trimmed_row] | |
code_lists.append(trimmed_row) | |
return code_lists[0] | |
def redistribute_codes(code_list, snac_model): | |
device = next(snac_model.parameters()).device # Get the device of SNAC model | |
layer_1 = [] | |
layer_2 = [] | |
layer_3 = [] | |
for i in range((len(code_list)+1)//7): | |
layer_1.append(code_list[7*i]) | |
layer_2.append(code_list[7*i+1]-4096) | |
layer_3.append(code_list[7*i+2]-(2*4096)) | |
layer_3.append(code_list[7*i+3]-(3*4096)) | |
layer_2.append(code_list[7*i+4]-(4*4096)) | |
layer_3.append(code_list[7*i+5]-(5*4096)) | |
layer_3.append(code_list[7*i+6]-(6*4096)) | |
codes = [ | |
torch.tensor(layer_1, device=device).unsqueeze(0), | |
torch.tensor(layer_2, device=device).unsqueeze(0), | |
torch.tensor(layer_3, device=device).unsqueeze(0) | |
] | |
audio_hat = snac_model.decode(codes) | |
return audio_hat.detach().squeeze().cpu().numpy() # Always return CPU numpy array | |
def detect_silence(audio, threshold=0.005, min_silence_duration=1.3): | |
sample_rate = 24000 # Adjust if your sample rate is different | |
is_silent = np.abs(audio) < threshold | |
silent_regions = np.where(is_silent)[0] | |
silence_starts = [] | |
silence_ends = [] | |
if len(silent_regions) > 0: | |
silence_starts.append(silent_regions[0]) | |
for i in range(1, len(silent_regions)): | |
if silent_regions[i] - silent_regions[i-1] > 1: | |
silence_ends.append(silent_regions[i-1]) | |
silence_starts.append(silent_regions[i]) | |
silence_ends.append(silent_regions[-1]) | |
long_silences = [(start, end) for start, end in zip(silence_starts, silence_ends) | |
if (end - start) / sample_rate >= min_silence_duration] | |
return long_silences | |
def generate_audio(script_output, voice1, voice2, num_hosts, temperature, top_p, repetition_penalty, max_new_tokens): | |
try: | |
paragraphs = script_output.split('\n\n') # Split by double newline | |
audio_samples = [] | |
for i, paragraph in tqdm(enumerate(paragraphs), total=len(paragraphs), desc="Generating audio"): | |
if not paragraph.strip(): | |
continue | |
voice = voice1 if num_hosts == "1" or i % 2 == 0 else voice2 | |
input_ids, attention_mask = process_prompt(paragraph, voice, tokenizer, device) | |
with torch.no_grad(): | |
generated_ids = model.generate( | |
input_ids, | |
attention_mask=attention_mask, | |
do_sample=True, | |
temperature=temperature, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
max_new_tokens=max_new_tokens, | |
num_return_sequences=1, | |
eos_token_id=128258, | |
) | |
code_list = parse_output(generated_ids) | |
paragraph_audio = redistribute_codes(code_list, snac_model) | |
# Add silence detection here | |
silences = detect_silence(paragraph_audio) | |
if silences: | |
# Trim the audio at the last detected silence | |
paragraph_audio = paragraph_audio[:silences[-1][1]] | |
audio_samples.append(paragraph_audio) | |
final_audio = np.concatenate(audio_samples) | |
# Normalize the audio | |
final_audio = np.int16(final_audio / np.max(np.abs(final_audio)) * 32767) | |
return final_audio | |
except Exception as e: | |
logger.error(f"Error generating speech: {str(e)}") | |
return None | |
def combined_callback(generate_script_clicks, generate_audio_clicks, advanced_settings_clicks, clear_clicks, | |
host1_name, host2_name, podcast_name, podcast_topic, prompt, uploaded_file, duration, num_hosts, | |
script_output, voice1, voice2, temperature, top_p, repetition_penalty, max_new_tokens, is_advanced_open): | |
ctx = dash.callback_context | |
if not ctx.triggered: | |
return dash.no_update, dash.no_update, dash.no_update, dash.no_update, "", "" | |
trigger_id = ctx.triggered[0]['prop_id'].split('.')[0] | |
if trigger_id == "advanced-settings-toggle": | |
return dash.no_update, dash.no_update, not is_advanced_open, dash.no_update, "", "" | |
if trigger_id == "generate-script-btn": | |
try: | |
api_key = os.environ.get("GEMINI_API_KEY") | |
if not api_key: | |
raise ValueError("Gemini API key not found in environment variables") | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-2.5-pro-preview-03-25') | |
combined_content = prompt or "" | |
if uploaded_file: | |
content_type, content_string = uploaded_file.split(',') | |
decoded = base64.b64decode(content_string) | |
file_bytes = io.BytesIO(decoded) | |
file_bytes.seek(0) | |
if file_bytes.read(4) == b'%PDF': | |
file_bytes.seek(0) | |
pdf_reader = PyPDF2.PdfReader(file_bytes) | |
file_content = "\n".join([page.extract_text() for page in pdf_reader.pages]) | |
else: | |
file_bytes.seek(0) | |
try: | |
file_content = file_bytes.read().decode('utf-8') | |
except UnicodeDecodeError: | |
file_bytes.seek(0) | |
try: | |
doc = Document(file_bytes) | |
file_content = "\n".join([para.text for para in doc.paragraphs]) | |
except: | |
raise ValueError("Unsupported file type or corrupted file") | |
combined_content += "\n" + file_content if combined_content else file_content | |
num_hosts = int(num_hosts) if num_hosts else 1 | |
prompt_template = f""" | |
Create a podcast script for {num_hosts} {'person' if num_hosts == 1 else 'people'} discussing: | |
{combined_content} | |
Duration: {duration} minutes. Include natural speech, humor, and occasional off-topic thoughts. | |
Use speech fillers like um, ah. Vary emotional tone. | |
Format: {'Monologue' if num_hosts == 1 else 'Alternating dialogue'} without speaker labels. | |
Separate {'paragraphs' if num_hosts == 1 else 'lines'} with blank lines. | |
If the number of {num_hosts} is 1 then each paragraph will be no more than 3 sentences each | |
Only provide the dialog for text to speech. | |
Only use these emotion tags in angle brackets: {', '.join(EMOTIVE_TAGS)}. | |
-Example: "I can't believe I stayed up all night <yawn> only to find out the meeting was canceled <groan>." | |
Ensure content flows naturally and stays on topic. Match the script length to {duration} minutes. | |
Do not include speaker labels like "jane:" or "john:" before dialogue. | |
The intro always includes the ({host1_name} and/or {host2_name}) if it exists and should be in the same paragraph. | |
The outro always includes the ({host1_name} and/or {host2_name}) if it exists and should be in the same paragraph | |
Do not include these types of transitions in the intro, outro or between paragraphs for example: "Intro Music fades in...". Its just dialog. | |
Keep each speaker's entire monologue in a single paragraph, regardless of length if the number of hosts is not 1. | |
Start a new paragraph only when switching to a different speaker if the number of hosts is not 1. | |
Maintain natural conversation flow and speech patterns within each monologue. | |
Use context clues or subtle references to indicate who is speaking without explicit labels if the number of hosts is not 1. | |
Use speaker names ({host1_name} and/or {host2_name}) sparingly, only when necessary for clarity or emphasis. Avoid starting every line with the other person's name. | |
Rely more on context and speech patterns to indicate who is speaking, rather than always stating names. | |
Use names primarily for transitions sparingly, definitely with agreements, or to draw attention to a specific point, not as a constant form of address. | |
{'Make sure the script is a monologue for one person.' if num_hosts == 1 else f'Ensure the dialogue alternates between two distinct voices, with {host1_name} speaking on odd-numbered lines and {host2_name} on even-numbered lines.'} | |
Always include intro with the speaker name and its the podcast name "{podcast_name}" in intoduce the topic of the podcast with "{podcast_topic}". | |
Incorporate the podcast name and topic naturally into the intro and outro, and ensure the content stays relevant to the specified topic throughout the script. | |
""" | |
response = model.generate_content(prompt_template) | |
return re.sub(r'[^a-zA-Z0-9\s.,?!<>]', '', response.text), dash.no_update, dash.no_update, dash.no_update, "", "" | |
except Exception as e: | |
logger.error(f"Error generating podcast script: {str(e)}") | |
return f"Error: {str(e)}", dash.no_update, dash.no_update, dash.no_update, "", "" | |
elif trigger_id == "generate-audio-btn": | |
if not script_output.strip(): | |
return dash.no_update, html.Div("No audio generated yet."), dash.no_update, dash.no_update, "", "" | |
final_audio = generate_audio(script_output, voice1, voice2, num_hosts, temperature, top_p, repetition_penalty, max_new_tokens) | |
if final_audio is not None: | |
# Convert to WAV format | |
buffer = io.BytesIO() | |
sf.write(buffer, final_audio, 24000, format='WAV', subtype='PCM_16') | |
buffer.seek(0) | |
# Convert to base64 for audio playback | |
audio_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') | |
src = f"data:audio/wav;base64,{audio_base64}" | |
# Log audio file size | |
logger.info(f"Generated audio file size: {len(audio_base64)} bytes") | |
# Create a download link for the audio | |
download_link = html.A("Download Audio", href=src, download="generated_audio.wav") | |
return dash.no_update, html.Div([ | |
html.Audio(src=src, controls=True), | |
html.Br(), | |
download_link | |
]), dash.no_update, dash.no_update, "", "" | |
else: | |
logger.error("Failed to generate audio") | |
return dash.no_update, html.Div("Error generating audio"), dash.no_update, dash.no_update, "", "" | |
return dash.no_update, dash.no_update, dash.no_update, dash.no_update, "", "" | |
# Run the app | |
if __name__ == '__main__': | |
print("Starting the Dash application...") | |
app.run(debug=True, host='0.0.0.0', port=7860) | |
print("Dash application has finished running.") |