Spaces:
Running
Running
import os | |
# Set a default watermark key to avoid the NoneType error | |
# Do this BEFORE any imports | |
if "WATERMARK_KEY" not in os.environ: | |
os.environ["WATERMARK_KEY"] = "0 0 0 0" # Default placeholder | |
import subprocess | |
import tempfile | |
import gradio as gr | |
import numpy as np | |
import spaces | |
import torch | |
import torchaudio | |
from generator import Segment, load_csm_1b | |
from watermarking import watermark | |
# Simplified environment variables handling | |
gpu_timeout = int(os.getenv("GPU_TIMEOUT", 60)) | |
SPACE_INTRO_TEXT = """\ | |
# Sesame CSM 1B | |
Generate from CSM 1B (Conversational Speech Model). | |
Code is available on GitHub: [SesameAILabs/csm](https://github.com/SesameAILabs/csm). | |
Checkpoint is [hosted on HuggingFace](https://huggingface.co/sesame/csm-1b). | |
--- | |
""" | |
CONVO_INTRO_TEXT = """\ | |
## Conversation content | |
Each line is an utterance in the conversation to generate. Speakers alternate between A and B, starting with speaker A. | |
""" | |
DEFAULT_CONVERSATION = """\ | |
Hey how are you doing. | |
Pretty good, pretty good. | |
I'm great, so happy to be speaking to you. | |
Me too, this is some cool stuff huh? | |
Yeah, I've been reading more about speech generation, and it really seems like context is important. | |
Definitely. | |
""" | |
SPEAKER_PROMPTS = { | |
"conversational_a": { | |
"text": ( | |
"like revising for an exam I'd have to try and like keep up the momentum because I'd " | |
"start really early I'd be like okay I'm gonna start revising now and then like " | |
"you're revising for ages and then I just like start losing steam I didn't do that " | |
"for the exam we had recently to be fair that was a more of a last minute scenario " | |
"but like yeah I'm trying to like yeah I noticed this yesterday that like Mondays I " | |
"sort of start the day with this not like a panic but like a" | |
), | |
"audio": "prompts/conversational_a.wav", | |
}, | |
"conversational_b": { | |
"text": ( | |
"like a super Mario level. Like it's very like high detail. And like, once you get " | |
"into the park, it just like, everything looks like a computer game and they have all " | |
"these, like, you know, if, if there's like a, you know, like in a Mario game, they " | |
"will have like a question block. And if you like, you know, punch it, a coin will " | |
"come out. So like everyone, when they come into the park, they get like this little " | |
"bracelet and then you can go punching question blocks around." | |
), | |
"audio": "prompts/conversational_b.wav", | |
}, | |
"read_speech_a": { | |
"text": ( | |
"And Lake turned round upon me, a little abruptly, his odd yellowish eyes, a little " | |
"like those of the sea eagle, and the ghost of his smile that flickered on his " | |
"singularly pale face, with a stern and insidious look, confronted me." | |
), | |
"audio": "prompts/read_speech_a.wav", | |
}, | |
"read_speech_b": { | |
"text": ( | |
"He was such a big boy that he wore high boots and carried a jack knife. He gazed and " | |
"gazed at the cap, and could not keep from fingering the blue tassel." | |
), | |
"audio": "prompts/read_speech_b.wav", | |
}, | |
} | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
generator = load_csm_1b(device=device) | |
def convert_ebook_to_txt(ebook_path): | |
"""Convert an ebook file to text using Calibre's ebook-convert.""" | |
if not ebook_path: | |
return None | |
# Create a temporary file for the output | |
with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as temp_txt: | |
txt_path = temp_txt.name | |
try: | |
# Run ebook-convert from Calibre | |
subprocess.run( | |
["ebook-convert", ebook_path, txt_path], | |
check=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE | |
) | |
# Read the converted text | |
with open(txt_path, 'r', encoding='utf-8') as f: | |
text_content = f.read() | |
# Clean up | |
os.unlink(txt_path) | |
# Format the text into alternating lines for conversation | |
lines = [line.strip() for line in text_content.split('.') if line.strip()] | |
formatted_lines = [] | |
# Take up to 20 sentences to avoid extremely long conversations | |
for i, line in enumerate(lines[:20]): | |
formatted_lines.append(line + ".") | |
return "\n".join(formatted_lines) | |
except Exception as e: | |
if os.path.exists(txt_path): | |
os.unlink(txt_path) | |
raise gr.Error(f"Error converting ebook: {str(e)}") | |
def infer( | |
text_prompt_speaker_a, | |
text_prompt_speaker_b, | |
audio_prompt_speaker_a, | |
audio_prompt_speaker_b, | |
gen_conversation_input, | |
) -> tuple[np.ndarray, int]: | |
# Estimate token limit | |
if len(gen_conversation_input.strip() + text_prompt_speaker_a.strip() + text_prompt_speaker_b.strip()) >= 2000: | |
raise gr.Error("Prompts and conversation too long.", duration=30) | |
try: | |
return _infer( | |
text_prompt_speaker_a, | |
text_prompt_speaker_b, | |
audio_prompt_speaker_a, | |
audio_prompt_speaker_b, | |
gen_conversation_input, | |
) | |
except ValueError as e: | |
raise gr.Error(f"Error generating audio: {e}", duration=120) | |
def _infer( | |
text_prompt_speaker_a, | |
text_prompt_speaker_b, | |
audio_prompt_speaker_a, | |
audio_prompt_speaker_b, | |
gen_conversation_input, | |
) -> tuple[np.ndarray, int]: | |
audio_prompt_a = prepare_prompt(text_prompt_speaker_a, 0, audio_prompt_speaker_a) | |
audio_prompt_b = prepare_prompt(text_prompt_speaker_b, 1, audio_prompt_speaker_b) | |
prompt_segments: list[Segment] = [audio_prompt_a, audio_prompt_b] | |
generated_segments: list[Segment] = [] | |
conversation_lines = [line.strip() for line in gen_conversation_input.strip().split("\n") if line.strip()] | |
for i, line in enumerate(conversation_lines): | |
# Alternating speakers A and B, starting with A | |
speaker_id = i % 2 | |
audio_tensor = generator.generate( | |
text=line, | |
speaker=speaker_id, | |
context=prompt_segments + generated_segments, | |
max_audio_length_ms=30_000, | |
) | |
generated_segments.append(Segment(text=line, speaker=speaker_id, audio=audio_tensor)) | |
# Concatenate all generations and convert to 16-bit int format | |
audio_tensors = [segment.audio for segment in generated_segments] | |
audio_tensor = torch.cat(audio_tensors, dim=0) | |
# Get the watermark key from environment | |
watermark_key = list(map(int, os.getenv("WATERMARK_KEY").split(" "))) | |
# Watermarking | |
audio_tensor, wm_sample_rate = watermark( | |
generator._watermarker, audio_tensor, generator.sample_rate, watermark_key | |
) | |
audio_tensor = torchaudio.functional.resample( | |
audio_tensor, orig_freq=wm_sample_rate, new_freq=generator.sample_rate | |
) | |
audio_array = (audio_tensor * 32768).to(torch.int16).cpu().numpy() | |
return generator.sample_rate, audio_array | |
def prepare_prompt(text: str, speaker: int, audio_path: str) -> Segment: | |
audio_tensor, _ = load_prompt_audio(audio_path) | |
return Segment(text=text, speaker=speaker, audio=audio_tensor) | |
def load_prompt_audio(audio_path: str) -> torch.Tensor: | |
audio_tensor, sample_rate = torchaudio.load(audio_path) | |
audio_tensor = audio_tensor.squeeze(0) | |
if sample_rate != generator.sample_rate: | |
audio_tensor = torchaudio.functional.resample( | |
audio_tensor, orig_freq=sample_rate, new_freq=generator.sample_rate | |
) | |
return audio_tensor, generator.sample_rate | |
def create_speaker_prompt_ui(speaker_name: str): | |
speaker_dropdown = gr.Dropdown( | |
choices=list(SPEAKER_PROMPTS.keys()), label="Select a predefined speaker", value=speaker_name | |
) | |
with gr.Accordion("Or add your own voice prompt", open=False): | |
text_prompt_speaker = gr.Textbox(label="Speaker prompt", lines=4, value=SPEAKER_PROMPTS[speaker_name]["text"]) | |
audio_prompt_speaker = gr.Audio( | |
label="Speaker prompt", type="filepath", value=SPEAKER_PROMPTS[speaker_name]["audio"] | |
) | |
return speaker_dropdown, text_prompt_speaker, audio_prompt_speaker | |
def process_ebook(ebook_file): | |
if ebook_file is None: | |
return None | |
text_content = convert_ebook_to_txt(ebook_file) | |
return text_content | |
def update_input_method(choice): | |
if choice == "text_input": | |
return gr.update(visible=True), gr.update(visible=False), None | |
else: | |
return gr.update(visible=False), gr.update(visible=True), None | |
with gr.Blocks() as app: | |
gr.Markdown(SPACE_INTRO_TEXT) | |
gr.Markdown("## Voices") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Speaker A") | |
speaker_a_dropdown, text_prompt_speaker_a, audio_prompt_speaker_a = create_speaker_prompt_ui( | |
"conversational_a" | |
) | |
with gr.Column(): | |
gr.Markdown("### Speaker B") | |
speaker_b_dropdown, text_prompt_speaker_b, audio_prompt_speaker_b = create_speaker_prompt_ui( | |
"conversational_b" | |
) | |
def update_audio(speaker): | |
if speaker in SPEAKER_PROMPTS: | |
return SPEAKER_PROMPTS[speaker]["audio"] | |
return None | |
def update_text(speaker): | |
if speaker in SPEAKER_PROMPTS: | |
return SPEAKER_PROMPTS[speaker]["text"] | |
return None | |
speaker_a_dropdown.change(fn=update_audio, inputs=[speaker_a_dropdown], outputs=[audio_prompt_speaker_a]) | |
speaker_b_dropdown.change(fn=update_audio, inputs=[speaker_b_dropdown], outputs=[audio_prompt_speaker_b]) | |
speaker_a_dropdown.change(fn=update_text, inputs=[speaker_a_dropdown], outputs=[text_prompt_speaker_a]) | |
speaker_b_dropdown.change(fn=update_text, inputs=[speaker_b_dropdown], outputs=[text_prompt_speaker_b]) | |
gr.Markdown(CONVO_INTRO_TEXT) | |
# Radio button for selecting input method | |
input_method = gr.Radio( | |
["Direct text input", "Upload ebook file"], | |
label="Choose input method", | |
value="Direct text input" | |
) | |
# Container for text input method | |
with gr.Group(visible=True) as text_input_group: | |
gen_conversation_input = gr.TextArea(label="Conversation", lines=20, value=DEFAULT_CONVERSATION) | |
# Container for ebook upload method | |
with gr.Group(visible=False) as ebook_input_group: | |
ebook_file = gr.File(label="Upload ebook file (will be converted using Calibre)", file_types=[".epub", ".mobi", ".azw", ".azw3", ".fb2", ".pdf"]) | |
process_ebook_btn = gr.Button("Process Ebook") | |
input_method.change( | |
fn=lambda choice: update_input_method("text_input" if choice == "Direct text input" else "ebook"), | |
inputs=[input_method], | |
outputs=[text_input_group, ebook_input_group, gen_conversation_input] | |
) | |
process_ebook_btn.click( | |
fn=process_ebook, | |
inputs=[ebook_file], | |
outputs=[gen_conversation_input] | |
) | |
generate_btn = gr.Button("Generate conversation", variant="primary") | |
gr.Markdown("GPU time limited to 3 minutes, for longer usage duplicate the space.") | |
audio_output = gr.Audio(label="Synthesized audio") | |
generate_btn.click( | |
infer, | |
inputs=[ | |
text_prompt_speaker_a, | |
text_prompt_speaker_b, | |
audio_prompt_speaker_a, | |
audio_prompt_speaker_b, | |
gen_conversation_input, | |
], | |
outputs=[audio_output], | |
) | |
app.launch(ssr_mode=True) |