drewThomasson's picture
Update app.py
d6b74a5 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Spark-TTS by SparkAudio – Enhanced eBook Converter
Licensed under the Apache License, Version 2.0.
(See accompanying LICENSE file for details)
"""
import os
import torch
import soundfile as sf
import logging
import argparse
import platform
import subprocess
from datetime import datetime
import gradio as gr
# For eBook processing
import re
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import sent_tokenize
# For audio combination
from pydub import AudioSegment
# For progress bars
from tqdm import tqdm
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0"
# Ensure NLTK sentence tokenizer is downloaded
nltk.download('punkt')
nltk.download('punkt_tab')
# Optional: download pretrained model from Hugging Face if not already present.
try:
from huggingface_hub import snapshot_download
if not os.path.exists("pretrained_models/Spark-TTS-0.5B"):
print("Downloading pretrained model from Hugging Face...")
snapshot_download("SparkAudio/Spark-TTS-0.5B", local_dir="pretrained_models/Spark-TTS-0.5B")
except ImportError:
print("huggingface_hub is not installed. Make sure the pretrained model is already available.")
###########################
# Spark-TTS Core Functions
###########################
from cli.SparkTTS import SparkTTS
from sparktts.utils.token_parser import LEVELS_MAP_UI # This maps UI slider values to model values
def initialize_model(model_dir="pretrained_models/Spark-TTS-0.5B", device=0):
"""Load the Spark-TTS model once at startup."""
logging.info(f"Loading model from: {model_dir}")
if platform.system() == "Darwin":
device = torch.device("cpu")
logging.info("GPU acceleration not available, using CPU")
elif torch.cuda.is_available():
device = torch.device(f"cuda:{device}")
logging.info(f"Using CUDA device: {device}")
else:
device = torch.device("cpu")
logging.info("GPU acceleration not available, using CPU")
model = SparkTTS(model_dir, device)
return model
def run_tts(text, model, prompt_text=None, prompt_speech=None, gender=None, pitch=None, speed=None, save_dir="results"):
"""Perform TTS inference and save the generated audio fragment.
Returns the full path of the saved .wav file."""
logging.info(f"Saving audio to: {save_dir}")
if prompt_text is not None and len(prompt_text) < 2:
prompt_text = None
os.makedirs(save_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
save_path = os.path.join(save_dir, f"{timestamp}.wav")
logging.info("Starting inference...")
with torch.no_grad():
wav = model.inference(
text,
prompt_speech,
prompt_text,
gender,
pitch,
speed,
)
sf.write(save_path, wav, samplerate=16000)
logging.info(f"Audio saved at: {save_path}")
return save_path
##############################
# eBook-to-Audiobook Functions
##############################
def ensure_directory(directory_path):
if not os.path.exists(directory_path):
os.makedirs(directory_path)
def convert_to_epub(input_path, output_path):
"""Convert an eBook (mobi/pdf/etc.) to EPUB using Calibre's ebook-convert."""
try:
subprocess.run(['ebook-convert', input_path, output_path], check=True)
return True
except subprocess.CalledProcessError as e:
logging.error(f"ebook-convert failed: {e}")
return False
def save_chapters_as_text(epub_path, chapters_dir):
"""Extract HTML documents from the EPUB and save each as a text file (one per chapter)."""
book = epub.read_epub(epub_path)
chapter_counter = 0
for item in book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT:
soup = BeautifulSoup(item.get_content(), 'html.parser')
text = soup.get_text()
if text.strip():
chapter_file = os.path.join(chapters_dir, f"chapter_{chapter_counter}.txt")
with open(chapter_file, 'w', encoding='utf-8') as f:
f.write(text)
chapter_counter += 1
return chapter_counter
def create_chapter_labeled_book(ebook_file_path):
"""Convert the uploaded eBook into chapters saved as text files."""
working_dir = os.path.join(".", "Working_files")
ensure_directory(working_dir)
temp_epub = os.path.join(working_dir, "temp.epub")
chapters_dir = os.path.join(working_dir, "chapters")
ensure_directory(chapters_dir)
if os.path.exists(temp_epub):
os.remove(temp_epub)
if convert_to_epub(ebook_file_path, temp_epub):
num_chapters = save_chapters_as_text(temp_epub, chapters_dir)
logging.info(f"Extracted {num_chapters} chapters.")
return chapters_dir
else:
raise Exception("Failed to convert ebook to EPUB.")
def split_long_sentence(sentence, max_length=250):
"""Split a long sentence into smaller fragments at the last space before max_length."""
parts = []
while len(sentence) > max_length:
split_at = sentence.rfind(' ', 0, max_length)
if split_at == -1:
split_at = max_length
parts.append(sentence[:split_at].strip())
sentence = sentence[split_at:].strip()
parts.append(sentence)
return parts
def combine_wav_files(file_list, output_file):
"""Combine a list of WAV files into one WAV file."""
combined = AudioSegment.empty()
for f in file_list:
seg = AudioSegment.from_wav(f)
combined += seg
combined.export(output_file, format="wav")
def convert_ebook_to_audiobook(ebook_file_path, model, gender=None, pitch=None, speed=None, prompt_text=None, prompt_speech=None):
"""Convert an entire eBook into an audiobook WAV file.
Processes chapters, splits sentences, runs TTS for each fragment,
and combines all fragments with brief silences between chapters."""
# Step 1: Create chapters
chapters_dir = create_chapter_labeled_book(ebook_file_path)
chapter_files = sorted(
[os.path.join(chapters_dir, f) for f in os.listdir(chapters_dir) if f.startswith("chapter_") and f.endswith(".txt")],
key=lambda x: int(re.findall(r'\d+', os.path.basename(x))[0])
)
output_dir = os.path.join(".", "Audiobooks")
ensure_directory(output_dir)
chapter_audio_files = []
temp_audio_dir = os.path.join(".", "Working_files", "temp_audio")
ensure_directory(temp_audio_dir)
# Process each chapter with a progress bar
for chapter_file in tqdm(chapter_files, desc="Processing Chapters"):
with open(chapter_file, 'r', encoding='utf-8') as f:
text = f.read()
sentences = sent_tokenize(text)
fragment_audio_files = []
counter = 0
# Process each sentence in the chapter with a progress bar
for sentence in tqdm(sentences, desc=f"Processing {os.path.basename(chapter_file)}", leave=False):
fragments = split_long_sentence(sentence)
for frag in fragments:
if frag:
frag_wav = run_tts(frag, model, prompt_text=prompt_text, prompt_speech=prompt_speech,
gender=gender, pitch=pitch, speed=speed, save_dir=temp_audio_dir)
new_frag_wav = os.path.join(temp_audio_dir, f"{os.path.basename(chapter_file)}_{counter}.wav")
os.rename(frag_wav, new_frag_wav)
fragment_audio_files.append(new_frag_wav)
counter += 1
chapter_audio = os.path.join(temp_audio_dir, f"{os.path.basename(chapter_file)}_combined.wav")
combine_wav_files(fragment_audio_files, chapter_audio)
chapter_audio_files.append(chapter_audio)
silence = AudioSegment.silent(duration=2000)
final_audio = AudioSegment.empty()
for f in chapter_audio_files:
seg = AudioSegment.from_wav(f)
final_audio += seg + silence
final_output = os.path.join(output_dir, os.path.splitext(os.path.basename(ebook_file_path))[0] + ".wav")
final_audio.export(final_output, format="wav")
return final_output
##########################
# Gradio UI Build Function
##########################
def build_ui(model_dir, device=0):
# Initialize the model
model = initialize_model(model_dir, device=device)
# Voice Clone Tab callback
def voice_clone(text, prompt_text, prompt_wav_upload, prompt_wav_record):
# If a voice sample is provided, use it exclusively;
# otherwise, fall back to a default gender selection.
if prompt_wav_upload or prompt_wav_record:
prompt_speech = prompt_wav_upload if prompt_wav_upload else prompt_wav_record
gender_used = None # ignore gender when cloning from a sample
else:
prompt_speech = None
gender_used = "male" # default fallback; adjust if needed
return run_tts(text, model, prompt_text=prompt_text, prompt_speech=prompt_speech, gender=gender_used)
# Voice Creation Tab callback with slider value mapping
def voice_creation(text, gender, pitch, speed):
pitch_val = LEVELS_MAP_UI[int(pitch)]
speed_val = LEVELS_MAP_UI[int(speed)]
return run_tts(text, model, gender=gender, pitch=pitch_val, speed=speed_val)
# eBook Conversion Tab callback with voice clone vs. creation logic
def ebook_conversion(ebook_file, gender, pitch, speed, prompt_text, prompt_wav_upload, prompt_wav_record):
# If a prompt audio file is uploaded, use it exclusively.
if prompt_wav_upload or prompt_wav_record:
prompt_speech = prompt_wav_upload if prompt_wav_upload else prompt_wav_record
gender_used = None
pitch_val = None
speed_val = None
else:
prompt_speech = None
gender_used = gender
pitch_val = LEVELS_MAP_UI[int(pitch)]
speed_val = LEVELS_MAP_UI[int(speed)]
ebook_file_path = ebook_file.name if hasattr(ebook_file, "name") else ebook_file
return convert_ebook_to_audiobook(
ebook_file_path, model,
gender=gender_used, pitch=pitch_val, speed=speed_val,
prompt_text=prompt_text, prompt_speech=prompt_speech
)
# Build the Gradio interface with three tabs
with gr.Blocks() as demo:
gr.HTML('<h1 style="text-align: center;">Spark-TTS by SparkAudio – Enhanced eBook Converter</h1>')
with gr.Tabs():
# Voice Clone Tab
with gr.TabItem("Voice Clone"):
gr.Markdown("### Upload reference audio or record a prompt")
with gr.Row():
prompt_wav_upload = gr.Audio(sources="upload", type="filepath",
label="Upload Prompt Audio (>=16kHz)")
prompt_wav_record = gr.Audio(sources="microphone", type="filepath",
label="Record Prompt Audio")
with gr.Row():
text_input = gr.Textbox(label="Text", lines=3, placeholder="Enter text")
prompt_text_input = gr.Textbox(label="Prompt Text (Optional)", lines=3,
placeholder="Enter prompt text")
audio_output_clone = gr.Audio(label="Generated Audio", autoplay=True, streaming=True)
btn_clone = gr.Button("Generate Voice Clone")
btn_clone.click(
voice_clone,
inputs=[text_input, prompt_text_input, prompt_wav_upload, prompt_wav_record],
outputs=audio_output_clone
)
# Voice Creation Tab
with gr.TabItem("Voice Creation"):
gr.Markdown("### Create a custom voice")
with gr.Row():
gender = gr.Radio(choices=["male", "female"], value="male", label="Gender")
pitch = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Pitch")
speed = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Speed")
text_input_creation = gr.Textbox(label="Input Text", lines=3,
placeholder="Enter text",
value="Generate custom voice sample.")
audio_output_creation = gr.Audio(label="Generated Audio", autoplay=True, streaming=True)
btn_create = gr.Button("Create Voice")
btn_create.click(
voice_creation,
inputs=[text_input_creation, gender, pitch, speed],
outputs=audio_output_creation
)
# eBook Conversion Tab
with gr.TabItem("eBook Conversion"):
gr.Markdown("### Convert an eBook into an Audiobook")
ebook_file = gr.File(label="Upload eBook File (e.g., epub, mobi, pdf, txt)",
file_types=[".epub", ".mobi", ".pdf", ".txt"])
with gr.Row():
gender_ebook = gr.Radio(choices=["male", "female"], value="male", label="Gender")
pitch_ebook = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Pitch")
speed_ebook = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Speed")
prompt_text_ebook = gr.Textbox(label="Prompt Text (Optional)", lines=3,
placeholder="Enter prompt text for voice cloning")
with gr.Row():
prompt_wav_upload_ebook = gr.Audio(sources="upload", type="filepath",
label="Upload Prompt Audio (>=16kHz)")
prompt_wav_record_ebook = gr.Audio(sources="microphone", type="filepath",
label="Record Prompt Audio")
audio_output_ebook = gr.Audio(label="Generated Audiobook", autoplay=True, streaming=True)
btn_ebook = gr.Button("Convert eBook")
btn_ebook.click(
ebook_conversion,
inputs=[ebook_file, gender_ebook, pitch_ebook, speed_ebook, prompt_text_ebook,
prompt_wav_upload_ebook, prompt_wav_record_ebook],
outputs=audio_output_ebook
)
return demo
def parse_arguments():
parser = argparse.ArgumentParser(description="Spark-TTS eBook Converter")
parser.add_argument("--model_dir", type=str, default="pretrained_models/Spark-TTS-0.5B",
help="Path to the model directory.")
parser.add_argument("--device", type=int, default=0, help="GPU device id")
parser.add_argument("--server_name", type=str, default="0.0.0.0", help="Server host")
parser.add_argument("--server_port", type=int, default=7860, help="Server port")
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
demo = build_ui(args.model_dir, args.device)
demo.launch(server_name=args.server_name, server_port=args.server_port)