|
|
|
|
|
""" |
|
Spark-TTS by SparkAudio – Enhanced eBook Converter |
|
Licensed under the Apache License, Version 2.0. |
|
(See accompanying LICENSE file for details) |
|
""" |
|
|
|
import os |
|
import torch |
|
import soundfile as sf |
|
import logging |
|
import argparse |
|
import platform |
|
import subprocess |
|
from datetime import datetime |
|
|
|
import gradio as gr |
|
|
|
|
|
import re |
|
import ebooklib |
|
from ebooklib import epub |
|
from bs4 import BeautifulSoup |
|
import nltk |
|
from nltk.tokenize import sent_tokenize |
|
|
|
|
|
from pydub import AudioSegment |
|
|
|
|
|
from tqdm import tqdm |
|
|
|
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0" |
|
|
|
|
|
nltk.download('punkt') |
|
nltk.download('punkt_tab') |
|
|
|
|
|
try: |
|
from huggingface_hub import snapshot_download |
|
if not os.path.exists("pretrained_models/Spark-TTS-0.5B"): |
|
print("Downloading pretrained model from Hugging Face...") |
|
snapshot_download("SparkAudio/Spark-TTS-0.5B", local_dir="pretrained_models/Spark-TTS-0.5B") |
|
except ImportError: |
|
print("huggingface_hub is not installed. Make sure the pretrained model is already available.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
from cli.SparkTTS import SparkTTS |
|
from sparktts.utils.token_parser import LEVELS_MAP_UI |
|
|
|
def initialize_model(model_dir="pretrained_models/Spark-TTS-0.5B", device=0): |
|
"""Load the Spark-TTS model once at startup.""" |
|
logging.info(f"Loading model from: {model_dir}") |
|
if platform.system() == "Darwin": |
|
device = torch.device("cpu") |
|
logging.info("GPU acceleration not available, using CPU") |
|
elif torch.cuda.is_available(): |
|
device = torch.device(f"cuda:{device}") |
|
logging.info(f"Using CUDA device: {device}") |
|
else: |
|
device = torch.device("cpu") |
|
logging.info("GPU acceleration not available, using CPU") |
|
model = SparkTTS(model_dir, device) |
|
return model |
|
|
|
def run_tts(text, model, prompt_text=None, prompt_speech=None, gender=None, pitch=None, speed=None, save_dir="results"): |
|
"""Perform TTS inference and save the generated audio fragment. |
|
Returns the full path of the saved .wav file.""" |
|
logging.info(f"Saving audio to: {save_dir}") |
|
if prompt_text is not None and len(prompt_text) < 2: |
|
prompt_text = None |
|
os.makedirs(save_dir, exist_ok=True) |
|
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") |
|
save_path = os.path.join(save_dir, f"{timestamp}.wav") |
|
logging.info("Starting inference...") |
|
with torch.no_grad(): |
|
wav = model.inference( |
|
text, |
|
prompt_speech, |
|
prompt_text, |
|
gender, |
|
pitch, |
|
speed, |
|
) |
|
sf.write(save_path, wav, samplerate=16000) |
|
logging.info(f"Audio saved at: {save_path}") |
|
return save_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_directory(directory_path): |
|
if not os.path.exists(directory_path): |
|
os.makedirs(directory_path) |
|
|
|
def convert_to_epub(input_path, output_path): |
|
"""Convert an eBook (mobi/pdf/etc.) to EPUB using Calibre's ebook-convert.""" |
|
try: |
|
subprocess.run(['ebook-convert', input_path, output_path], check=True) |
|
return True |
|
except subprocess.CalledProcessError as e: |
|
logging.error(f"ebook-convert failed: {e}") |
|
return False |
|
|
|
def save_chapters_as_text(epub_path, chapters_dir): |
|
"""Extract HTML documents from the EPUB and save each as a text file (one per chapter).""" |
|
book = epub.read_epub(epub_path) |
|
chapter_counter = 0 |
|
for item in book.get_items(): |
|
if item.get_type() == ebooklib.ITEM_DOCUMENT: |
|
soup = BeautifulSoup(item.get_content(), 'html.parser') |
|
text = soup.get_text() |
|
if text.strip(): |
|
chapter_file = os.path.join(chapters_dir, f"chapter_{chapter_counter}.txt") |
|
with open(chapter_file, 'w', encoding='utf-8') as f: |
|
f.write(text) |
|
chapter_counter += 1 |
|
return chapter_counter |
|
|
|
def create_chapter_labeled_book(ebook_file_path): |
|
"""Convert the uploaded eBook into chapters saved as text files.""" |
|
working_dir = os.path.join(".", "Working_files") |
|
ensure_directory(working_dir) |
|
temp_epub = os.path.join(working_dir, "temp.epub") |
|
chapters_dir = os.path.join(working_dir, "chapters") |
|
ensure_directory(chapters_dir) |
|
if os.path.exists(temp_epub): |
|
os.remove(temp_epub) |
|
if convert_to_epub(ebook_file_path, temp_epub): |
|
num_chapters = save_chapters_as_text(temp_epub, chapters_dir) |
|
logging.info(f"Extracted {num_chapters} chapters.") |
|
return chapters_dir |
|
else: |
|
raise Exception("Failed to convert ebook to EPUB.") |
|
|
|
def split_long_sentence(sentence, max_length=250): |
|
"""Split a long sentence into smaller fragments at the last space before max_length.""" |
|
parts = [] |
|
while len(sentence) > max_length: |
|
split_at = sentence.rfind(' ', 0, max_length) |
|
if split_at == -1: |
|
split_at = max_length |
|
parts.append(sentence[:split_at].strip()) |
|
sentence = sentence[split_at:].strip() |
|
parts.append(sentence) |
|
return parts |
|
|
|
def combine_wav_files(file_list, output_file): |
|
"""Combine a list of WAV files into one WAV file.""" |
|
combined = AudioSegment.empty() |
|
for f in file_list: |
|
seg = AudioSegment.from_wav(f) |
|
combined += seg |
|
combined.export(output_file, format="wav") |
|
|
|
def convert_ebook_to_audiobook(ebook_file_path, model, gender=None, pitch=None, speed=None, prompt_text=None, prompt_speech=None): |
|
"""Convert an entire eBook into an audiobook WAV file. |
|
Processes chapters, splits sentences, runs TTS for each fragment, |
|
and combines all fragments with brief silences between chapters.""" |
|
|
|
chapters_dir = create_chapter_labeled_book(ebook_file_path) |
|
chapter_files = sorted( |
|
[os.path.join(chapters_dir, f) for f in os.listdir(chapters_dir) if f.startswith("chapter_") and f.endswith(".txt")], |
|
key=lambda x: int(re.findall(r'\d+', os.path.basename(x))[0]) |
|
) |
|
output_dir = os.path.join(".", "Audiobooks") |
|
ensure_directory(output_dir) |
|
chapter_audio_files = [] |
|
temp_audio_dir = os.path.join(".", "Working_files", "temp_audio") |
|
ensure_directory(temp_audio_dir) |
|
|
|
|
|
for chapter_file in tqdm(chapter_files, desc="Processing Chapters"): |
|
with open(chapter_file, 'r', encoding='utf-8') as f: |
|
text = f.read() |
|
sentences = sent_tokenize(text) |
|
fragment_audio_files = [] |
|
counter = 0 |
|
|
|
for sentence in tqdm(sentences, desc=f"Processing {os.path.basename(chapter_file)}", leave=False): |
|
fragments = split_long_sentence(sentence) |
|
for frag in fragments: |
|
if frag: |
|
frag_wav = run_tts(frag, model, prompt_text=prompt_text, prompt_speech=prompt_speech, |
|
gender=gender, pitch=pitch, speed=speed, save_dir=temp_audio_dir) |
|
new_frag_wav = os.path.join(temp_audio_dir, f"{os.path.basename(chapter_file)}_{counter}.wav") |
|
os.rename(frag_wav, new_frag_wav) |
|
fragment_audio_files.append(new_frag_wav) |
|
counter += 1 |
|
chapter_audio = os.path.join(temp_audio_dir, f"{os.path.basename(chapter_file)}_combined.wav") |
|
combine_wav_files(fragment_audio_files, chapter_audio) |
|
chapter_audio_files.append(chapter_audio) |
|
|
|
silence = AudioSegment.silent(duration=2000) |
|
final_audio = AudioSegment.empty() |
|
for f in chapter_audio_files: |
|
seg = AudioSegment.from_wav(f) |
|
final_audio += seg + silence |
|
final_output = os.path.join(output_dir, os.path.splitext(os.path.basename(ebook_file_path))[0] + ".wav") |
|
final_audio.export(final_output, format="wav") |
|
return final_output |
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_ui(model_dir, device=0): |
|
|
|
model = initialize_model(model_dir, device=device) |
|
|
|
|
|
def voice_clone(text, prompt_text, prompt_wav_upload, prompt_wav_record): |
|
|
|
|
|
if prompt_wav_upload or prompt_wav_record: |
|
prompt_speech = prompt_wav_upload if prompt_wav_upload else prompt_wav_record |
|
gender_used = None |
|
else: |
|
prompt_speech = None |
|
gender_used = "male" |
|
return run_tts(text, model, prompt_text=prompt_text, prompt_speech=prompt_speech, gender=gender_used) |
|
|
|
|
|
def voice_creation(text, gender, pitch, speed): |
|
pitch_val = LEVELS_MAP_UI[int(pitch)] |
|
speed_val = LEVELS_MAP_UI[int(speed)] |
|
return run_tts(text, model, gender=gender, pitch=pitch_val, speed=speed_val) |
|
|
|
|
|
def ebook_conversion(ebook_file, gender, pitch, speed, prompt_text, prompt_wav_upload, prompt_wav_record): |
|
|
|
if prompt_wav_upload or prompt_wav_record: |
|
prompt_speech = prompt_wav_upload if prompt_wav_upload else prompt_wav_record |
|
gender_used = None |
|
pitch_val = None |
|
speed_val = None |
|
else: |
|
prompt_speech = None |
|
gender_used = gender |
|
pitch_val = LEVELS_MAP_UI[int(pitch)] |
|
speed_val = LEVELS_MAP_UI[int(speed)] |
|
ebook_file_path = ebook_file.name if hasattr(ebook_file, "name") else ebook_file |
|
return convert_ebook_to_audiobook( |
|
ebook_file_path, model, |
|
gender=gender_used, pitch=pitch_val, speed=speed_val, |
|
prompt_text=prompt_text, prompt_speech=prompt_speech |
|
) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.HTML('<h1 style="text-align: center;">Spark-TTS by SparkAudio – Enhanced eBook Converter</h1>') |
|
with gr.Tabs(): |
|
|
|
with gr.TabItem("Voice Clone"): |
|
gr.Markdown("### Upload reference audio or record a prompt") |
|
with gr.Row(): |
|
prompt_wav_upload = gr.Audio(sources="upload", type="filepath", |
|
label="Upload Prompt Audio (>=16kHz)") |
|
prompt_wav_record = gr.Audio(sources="microphone", type="filepath", |
|
label="Record Prompt Audio") |
|
with gr.Row(): |
|
text_input = gr.Textbox(label="Text", lines=3, placeholder="Enter text") |
|
prompt_text_input = gr.Textbox(label="Prompt Text (Optional)", lines=3, |
|
placeholder="Enter prompt text") |
|
audio_output_clone = gr.Audio(label="Generated Audio", autoplay=True, streaming=True) |
|
btn_clone = gr.Button("Generate Voice Clone") |
|
btn_clone.click( |
|
voice_clone, |
|
inputs=[text_input, prompt_text_input, prompt_wav_upload, prompt_wav_record], |
|
outputs=audio_output_clone |
|
) |
|
|
|
with gr.TabItem("Voice Creation"): |
|
gr.Markdown("### Create a custom voice") |
|
with gr.Row(): |
|
gender = gr.Radio(choices=["male", "female"], value="male", label="Gender") |
|
pitch = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Pitch") |
|
speed = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Speed") |
|
text_input_creation = gr.Textbox(label="Input Text", lines=3, |
|
placeholder="Enter text", |
|
value="Generate custom voice sample.") |
|
audio_output_creation = gr.Audio(label="Generated Audio", autoplay=True, streaming=True) |
|
btn_create = gr.Button("Create Voice") |
|
btn_create.click( |
|
voice_creation, |
|
inputs=[text_input_creation, gender, pitch, speed], |
|
outputs=audio_output_creation |
|
) |
|
|
|
with gr.TabItem("eBook Conversion"): |
|
gr.Markdown("### Convert an eBook into an Audiobook") |
|
ebook_file = gr.File(label="Upload eBook File (e.g., epub, mobi, pdf, txt)", |
|
file_types=[".epub", ".mobi", ".pdf", ".txt"]) |
|
with gr.Row(): |
|
gender_ebook = gr.Radio(choices=["male", "female"], value="male", label="Gender") |
|
pitch_ebook = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Pitch") |
|
speed_ebook = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Speed") |
|
prompt_text_ebook = gr.Textbox(label="Prompt Text (Optional)", lines=3, |
|
placeholder="Enter prompt text for voice cloning") |
|
with gr.Row(): |
|
prompt_wav_upload_ebook = gr.Audio(sources="upload", type="filepath", |
|
label="Upload Prompt Audio (>=16kHz)") |
|
prompt_wav_record_ebook = gr.Audio(sources="microphone", type="filepath", |
|
label="Record Prompt Audio") |
|
audio_output_ebook = gr.Audio(label="Generated Audiobook", autoplay=True, streaming=True) |
|
btn_ebook = gr.Button("Convert eBook") |
|
btn_ebook.click( |
|
ebook_conversion, |
|
inputs=[ebook_file, gender_ebook, pitch_ebook, speed_ebook, prompt_text_ebook, |
|
prompt_wav_upload_ebook, prompt_wav_record_ebook], |
|
outputs=audio_output_ebook |
|
) |
|
return demo |
|
|
|
def parse_arguments(): |
|
parser = argparse.ArgumentParser(description="Spark-TTS eBook Converter") |
|
parser.add_argument("--model_dir", type=str, default="pretrained_models/Spark-TTS-0.5B", |
|
help="Path to the model directory.") |
|
parser.add_argument("--device", type=int, default=0, help="GPU device id") |
|
parser.add_argument("--server_name", type=str, default="0.0.0.0", help="Server host") |
|
parser.add_argument("--server_port", type=int, default=7860, help="Server port") |
|
return parser.parse_args() |
|
|
|
if __name__ == "__main__": |
|
args = parse_arguments() |
|
demo = build_ui(args.model_dir, args.device) |
|
demo.launch(server_name=args.server_name, server_port=args.server_port) |