File size: 11,674 Bytes
96dc011
956a3cd
 
 
 
 
 
fc6d12e
 
96dc011
 
 
 
 
 
 
 
 
7dcf55b
96dc011
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45e163c
96dc011
 
fc6d12e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96dc011
 
 
 
 
 
 
d794e1d
7dcf55b
d794e1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96dc011
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d794e1d
96dc011
 
 
 
 
 
 
ae03f2c
 
 
7dcf55b
96dc011
ae03f2c
96dc011
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc6d12e
 
 
 
 
 
 
 
 
 
 
 
 
 
96dc011
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc6d12e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96dc011
d794e1d
96dc011
 
 
 
 
 
 
 
 
 
 
 
 
 
fc6d12e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import os

# Set a default watermark key to avoid the NoneType error
# Do this BEFORE any imports
if "WATERMARK_KEY" not in os.environ:
    os.environ["WATERMARK_KEY"] = "0 0 0 0"  # Default placeholder

import subprocess
import tempfile

import gradio as gr
import numpy as np
import spaces
import torch
import torchaudio
from generator import Segment, load_csm_1b
from watermarking import watermark

# Simplified environment variables handling
gpu_timeout = int(os.getenv("GPU_TIMEOUT", 60))

SPACE_INTRO_TEXT = """\
# Sesame CSM 1B

Generate from CSM 1B (Conversational Speech Model). 
Code is available on GitHub: [SesameAILabs/csm](https://github.com/SesameAILabs/csm). 
Checkpoint is [hosted on HuggingFace](https://huggingface.co/sesame/csm-1b).

---

"""

CONVO_INTRO_TEXT = """\
## Conversation content

Each line is an utterance in the conversation to generate. Speakers alternate between A and B, starting with speaker A.
"""

DEFAULT_CONVERSATION = """\
Hey how are you doing.
Pretty good, pretty good.
I'm great, so happy to be speaking to you.
Me too, this is some cool stuff huh?
Yeah, I've been reading more about speech generation, and it really seems like context is important.
Definitely.
"""

SPEAKER_PROMPTS = {
    "conversational_a": {
        "text": (
            "like revising for an exam I'd have to try and like keep up the momentum because I'd "
            "start really early I'd be like okay I'm gonna start revising now and then like "
            "you're revising for ages and then I just like start losing steam I didn't do that "
            "for the exam we had recently to be fair that was a more of a last minute scenario "
            "but like yeah I'm trying to like yeah I noticed this yesterday that like Mondays I "
            "sort of start the day with this not like a panic but like a"
        ),
        "audio": "prompts/conversational_a.wav",
    },
    "conversational_b": {
        "text": (
            "like a super Mario level. Like it's very like high detail. And like, once you get "
            "into the park, it just like, everything looks like a computer game and they have all "
            "these, like, you know, if, if there's like a, you know, like in a Mario game, they "
            "will have like a question block. And if you like, you know, punch it, a coin will "
            "come out. So like everyone, when they come into the park, they get like this little "
            "bracelet and then you can go punching question blocks around."
        ),
        "audio": "prompts/conversational_b.wav",
    },
    "read_speech_a": {
        "text": (
            "And Lake turned round upon me, a little abruptly, his odd yellowish eyes, a little "
            "like those of the sea eagle, and the ghost of his smile that flickered on his "
            "singularly pale face, with a stern and insidious look, confronted me."
        ),
        "audio": "prompts/read_speech_a.wav",
    },
    "read_speech_b": {
        "text": (
            "He was such a big boy that he wore high boots and carried a jack knife. He gazed and "
            "gazed at the cap, and could not keep from fingering the blue tassel."
        ),
        "audio": "prompts/read_speech_b.wav",
    },
}

device = "cuda" if torch.cuda.is_available() else "cpu"
generator = load_csm_1b(device=device)


def convert_ebook_to_txt(ebook_path):
    """Convert an ebook file to text using Calibre's ebook-convert."""
    if not ebook_path:
        return None
    
    # Create a temporary file for the output
    with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as temp_txt:
        txt_path = temp_txt.name
    
    try:
        # Run ebook-convert from Calibre
        subprocess.run(
            ["ebook-convert", ebook_path, txt_path], 
            check=True, 
            stdout=subprocess.PIPE, 
            stderr=subprocess.PIPE
        )
        
        # Read the converted text
        with open(txt_path, 'r', encoding='utf-8') as f:
            text_content = f.read()
        
        # Clean up
        os.unlink(txt_path)
        
        # Format the text into alternating lines for conversation
        lines = [line.strip() for line in text_content.split('.') if line.strip()]
        formatted_lines = []
        
        # Take up to 20 sentences to avoid extremely long conversations
        for i, line in enumerate(lines[:20]):
            formatted_lines.append(line + ".")
        
        return "\n".join(formatted_lines)
    
    except Exception as e:
        if os.path.exists(txt_path):
            os.unlink(txt_path)
        raise gr.Error(f"Error converting ebook: {str(e)}")


@spaces.GPU(duration=gpu_timeout)
def infer(
    text_prompt_speaker_a,
    text_prompt_speaker_b,
    audio_prompt_speaker_a,
    audio_prompt_speaker_b,
    gen_conversation_input,
) -> tuple[np.ndarray, int]:
    # Estimate token limit
    if len(gen_conversation_input.strip() + text_prompt_speaker_a.strip() + text_prompt_speaker_b.strip()) >= 2000:
        raise gr.Error("Prompts and conversation too long.", duration=30)

    try:
        return _infer(
            text_prompt_speaker_a,
            text_prompt_speaker_b,
            audio_prompt_speaker_a,
            audio_prompt_speaker_b,
            gen_conversation_input,
        )
    except ValueError as e:
        raise gr.Error(f"Error generating audio: {e}", duration=120)


def _infer(
    text_prompt_speaker_a,
    text_prompt_speaker_b,
    audio_prompt_speaker_a,
    audio_prompt_speaker_b,
    gen_conversation_input,
) -> tuple[np.ndarray, int]:
    audio_prompt_a = prepare_prompt(text_prompt_speaker_a, 0, audio_prompt_speaker_a)
    audio_prompt_b = prepare_prompt(text_prompt_speaker_b, 1, audio_prompt_speaker_b)

    prompt_segments: list[Segment] = [audio_prompt_a, audio_prompt_b]
    generated_segments: list[Segment] = []

    conversation_lines = [line.strip() for line in gen_conversation_input.strip().split("\n") if line.strip()]
    for i, line in enumerate(conversation_lines):
        # Alternating speakers A and B, starting with A
        speaker_id = i % 2

        audio_tensor = generator.generate(
            text=line,
            speaker=speaker_id,
            context=prompt_segments + generated_segments,
            max_audio_length_ms=30_000,
        )
        generated_segments.append(Segment(text=line, speaker=speaker_id, audio=audio_tensor))

    # Concatenate all generations and convert to 16-bit int format
    audio_tensors = [segment.audio for segment in generated_segments]
    audio_tensor = torch.cat(audio_tensors, dim=0)

    # Get the watermark key from environment
    watermark_key = list(map(int, os.getenv("WATERMARK_KEY").split(" ")))
    
    # Watermarking
    audio_tensor, wm_sample_rate = watermark(
        generator._watermarker, audio_tensor, generator.sample_rate, watermark_key
    )
    audio_tensor = torchaudio.functional.resample(
        audio_tensor, orig_freq=wm_sample_rate, new_freq=generator.sample_rate
    )

    audio_array = (audio_tensor * 32768).to(torch.int16).cpu().numpy()

    return generator.sample_rate, audio_array


def prepare_prompt(text: str, speaker: int, audio_path: str) -> Segment:
    audio_tensor, _ = load_prompt_audio(audio_path)
    return Segment(text=text, speaker=speaker, audio=audio_tensor)


def load_prompt_audio(audio_path: str) -> torch.Tensor:
    audio_tensor, sample_rate = torchaudio.load(audio_path)
    audio_tensor = audio_tensor.squeeze(0)
    if sample_rate != generator.sample_rate:
        audio_tensor = torchaudio.functional.resample(
            audio_tensor, orig_freq=sample_rate, new_freq=generator.sample_rate
        )
    return audio_tensor, generator.sample_rate


def create_speaker_prompt_ui(speaker_name: str):
    speaker_dropdown = gr.Dropdown(
        choices=list(SPEAKER_PROMPTS.keys()), label="Select a predefined speaker", value=speaker_name
    )
    with gr.Accordion("Or add your own voice prompt", open=False):
        text_prompt_speaker = gr.Textbox(label="Speaker prompt", lines=4, value=SPEAKER_PROMPTS[speaker_name]["text"])
        audio_prompt_speaker = gr.Audio(
            label="Speaker prompt", type="filepath", value=SPEAKER_PROMPTS[speaker_name]["audio"]
        )

    return speaker_dropdown, text_prompt_speaker, audio_prompt_speaker


def process_ebook(ebook_file):
    if ebook_file is None:
        return None
    text_content = convert_ebook_to_txt(ebook_file)
    return text_content


def update_input_method(choice):
    if choice == "text_input":
        return gr.update(visible=True), gr.update(visible=False), None
    else:
        return gr.update(visible=False), gr.update(visible=True), None


with gr.Blocks() as app:
    gr.Markdown(SPACE_INTRO_TEXT)
    gr.Markdown("## Voices")
    with gr.Row():
        with gr.Column():
            gr.Markdown("### Speaker A")
            speaker_a_dropdown, text_prompt_speaker_a, audio_prompt_speaker_a = create_speaker_prompt_ui(
                "conversational_a"
            )

        with gr.Column():
            gr.Markdown("### Speaker B")
            speaker_b_dropdown, text_prompt_speaker_b, audio_prompt_speaker_b = create_speaker_prompt_ui(
                "conversational_b"
            )

    def update_audio(speaker):
        if speaker in SPEAKER_PROMPTS:
            return SPEAKER_PROMPTS[speaker]["audio"]
        return None

    def update_text(speaker):
        if speaker in SPEAKER_PROMPTS:
            return SPEAKER_PROMPTS[speaker]["text"]
        return None

    speaker_a_dropdown.change(fn=update_audio, inputs=[speaker_a_dropdown], outputs=[audio_prompt_speaker_a])
    speaker_b_dropdown.change(fn=update_audio, inputs=[speaker_b_dropdown], outputs=[audio_prompt_speaker_b])

    speaker_a_dropdown.change(fn=update_text, inputs=[speaker_a_dropdown], outputs=[text_prompt_speaker_a])
    speaker_b_dropdown.change(fn=update_text, inputs=[speaker_b_dropdown], outputs=[text_prompt_speaker_b])

    gr.Markdown(CONVO_INTRO_TEXT)
    
    # Radio button for selecting input method
    input_method = gr.Radio(
        ["Direct text input", "Upload ebook file"], 
        label="Choose input method", 
        value="Direct text input"
    )
    
    # Container for text input method
    with gr.Group(visible=True) as text_input_group:
        gen_conversation_input = gr.TextArea(label="Conversation", lines=20, value=DEFAULT_CONVERSATION)
    
    # Container for ebook upload method
    with gr.Group(visible=False) as ebook_input_group:
        ebook_file = gr.File(label="Upload ebook file (will be converted using Calibre)", file_types=[".epub", ".mobi", ".azw", ".azw3", ".fb2", ".pdf"])
        process_ebook_btn = gr.Button("Process Ebook")
    
    input_method.change(
        fn=lambda choice: update_input_method("text_input" if choice == "Direct text input" else "ebook"),
        inputs=[input_method],
        outputs=[text_input_group, ebook_input_group, gen_conversation_input]
    )
    
    process_ebook_btn.click(
        fn=process_ebook,
        inputs=[ebook_file],
        outputs=[gen_conversation_input]
    )
    
    generate_btn = gr.Button("Generate conversation", variant="primary")
    gr.Markdown("GPU time limited to 3 minutes, for longer usage duplicate the space.")
    audio_output = gr.Audio(label="Synthesized audio")

    generate_btn.click(
        infer,
        inputs=[
            text_prompt_speaker_a,
            text_prompt_speaker_b,
            audio_prompt_speaker_a,
            audio_prompt_speaker_b,
            gen_conversation_input,
        ],
        outputs=[audio_output],
    )

app.launch(ssr_mode=True)