|
import argparse |
|
import logging |
|
import os |
|
import sys |
|
import shutil |
|
import zipfile |
|
import re |
|
import yt_dlp |
|
from pathlib import Path |
|
from urllib.parse import urlparse |
|
from functools import lru_cache |
|
import gradio as gr |
|
import requests |
|
from traceback import format_exc |
|
import asyncio |
|
|
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s [%(levelname)s] %(message)s", |
|
handlers=[logging.StreamHandler(), logging.FileHandler('neorvc.log')] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
BASE_DIR = os.getcwd() |
|
sys.path.append(BASE_DIR) |
|
|
|
try: |
|
from neorvc.main_cli import song_cover_pipeline, vocal_cover_pipeline |
|
from rvc.rvc_cli import run_prerequisites_script |
|
except ImportError as e: |
|
logger.error(f"Failed to import pipeline functions: {e}") |
|
raise gr.Error(f"Failed to load required modules. Please check installation: {e}") |
|
|
|
|
|
try: |
|
run_prerequisites_script( |
|
models=True, |
|
exe=False, |
|
) |
|
except Exception as e: |
|
logger.error(f"Error running prerequisites: {e}") |
|
raise gr.Error(f"Failed to initialize prerequisites: {e}") |
|
|
|
|
|
CONFIG = { |
|
"models_dir": "logs", |
|
"output_dir": "song_output", |
|
"max_upload_size_mb": 500 |
|
} |
|
RVC_MODELS_DIR = os.path.join(BASE_DIR, CONFIG['models_dir']) |
|
OUTPUT_DIR = os.path.join(BASE_DIR, CONFIG['output_dir']) |
|
ALLOWED_DIR = os.path.abspath(BASE_DIR) |
|
AUDIO_EXTS = ['.mp3', '.wav', '.flac', '.ogg', '.m4a'] |
|
|
|
|
|
try: |
|
os.makedirs(RVC_MODELS_DIR, exist_ok=True) |
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
except OSError as e: |
|
logger.error(f"Error creating directories: {e}") |
|
raise gr.Error(f"Failed to create required directories: {e}") |
|
|
|
@lru_cache(maxsize=1) |
|
def get_current_models(models_dir): |
|
"""Retrieve list of model directories, excluding specific items.""" |
|
try: |
|
models_list = [item for item in os.listdir(models_dir) if item != 'mute' and os.path.isdir(os.path.join(models_dir, item))] |
|
return sorted(models_list) |
|
except OSError as e: |
|
logger.error(f"Error accessing models directory: {e}\n{format_exc()}") |
|
raise gr.Error(f"Failed to list models. Check directory permissions: {e}") |
|
|
|
def update_models_list(): |
|
"""Update the dropdown list of available models.""" |
|
try: |
|
get_current_models.cache_clear() |
|
models = get_current_models(RVC_MODELS_DIR) |
|
if not models: |
|
return gr.update(choices=[], value=None), "No models found in the directory." |
|
return gr.update(choices=models, value=None), "Model list refreshed successfully." |
|
except Exception as e: |
|
logger.error(f"Error updating models list: {e}\n{format_exc()}") |
|
return gr.update(choices=[]), f"Failed to refresh models: {e}" |
|
|
|
def sanitize_model_name(dir_name): |
|
"""Sanitize model name to prevent invalid characters.""" |
|
if not dir_name: |
|
raise gr.Error("Model name cannot be empty.") |
|
if not re.match(r'^[a-zA-Z0-9_-]+$', dir_name): |
|
raise gr.Error("Invalid model name. Use alphanumeric characters, underscores, or hyphens only.") |
|
return dir_name |
|
|
|
def validate_file_path(file_path): |
|
"""Ensure file path is within allowed directory.""" |
|
try: |
|
file_path = os.path.abspath(file_path) |
|
if not os.path.commonpath([file_path, ALLOWED_DIR]).startswith(ALLOWED_DIR): |
|
raise gr.Error("File path is outside allowed directory.") |
|
return file_path |
|
except Exception as e: |
|
logger.error(f"Invalid file path: {e}\n{format_exc()}") |
|
raise gr.Error(f"Invalid file path: {e}") |
|
|
|
def extract_zip(extraction_folder, zip_path, progress=gr.Progress()): |
|
"""Extract zip file and organize model files.""" |
|
try: |
|
extraction_folder = validate_file_path(extraction_folder) |
|
zip_path = validate_file_path(zip_path) |
|
os.makedirs(extraction_folder, exist_ok=True) |
|
|
|
progress(0.2, desc="Extracting zip file...") |
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
zip_ref.extractall(extraction_folder) |
|
os.remove(zip_path) |
|
except (zipfile.BadZipFile, OSError) as e: |
|
shutil.rmtree(extraction_folder, ignore_errors=True) |
|
logger.error(f"Error extracting zip: {e}\n{format_exc()}") |
|
raise gr.Error(f"Failed to extract zip file. Ensure it's a valid zip: {e}") |
|
|
|
index_filepath = model_filepath = None |
|
try: |
|
for root, _, files in os.walk(extraction_folder): |
|
for name in files: |
|
file_path = os.path.join(root, name) |
|
try: |
|
if name.endswith('.index') and os.path.getsize(file_path) > 1024 * 100: |
|
index_filepath = file_path |
|
elif name.endswith('.pth') and os.path.getsize(file_path) > 1024 * 1024 * 40: |
|
model_filepath = file_path |
|
except OSError as sub_e: |
|
logger.warning(f"Error accessing file {file_path}: {sub_e}") |
|
continue |
|
except Exception as e: |
|
logger.error(f"Error scanning extracted files: {e}\n{format_exc()}") |
|
shutil.rmtree(extraction_folder, ignore_errors=True) |
|
raise gr.Error(f"Failed to process extracted files: {e}") |
|
|
|
if not model_filepath: |
|
shutil.rmtree(extraction_folder, ignore_errors=True) |
|
raise gr.Error(f"No valid .pth model file found in {extraction_folder}. Ensure a model file (>40MB) is included.") |
|
|
|
try: |
|
for filepath in (model_filepath, index_filepath): |
|
if filepath: |
|
new_path = os.path.join(extraction_folder, os.path.basename(filepath)) |
|
if filepath != new_path: |
|
os.rename(filepath, new_path) |
|
|
|
for item in os.listdir(extraction_folder): |
|
item_path = os.path.join(extraction_folder, item) |
|
if os.path.isdir(item_path): |
|
shutil.rmtree(item_path, ignore_errors=True) |
|
except OSError as e: |
|
logger.error(f"Error organizing extracted files: {e}\n{format_exc()}") |
|
shutil.rmtree(extraction_folder, ignore_errors=True) |
|
raise gr.Error(f"Failed to organize model files: {e}") |
|
|
|
progress(1.0, desc="Zip extraction completed") |
|
return f"Model extracted to {extraction_folder}" |
|
|
|
def download_online_model(url, dir_name, progress=gr.Progress()): |
|
"""Download and extract a model from a URL synchronously.""" |
|
try: |
|
dir_name = sanitize_model_name(dir_name) |
|
if not url: |
|
raise gr.Error("URL is required.") |
|
if not url.startswith(('http://', 'https://')): |
|
raise gr.Error("Invalid URL format. Must start with http:// or https://.") |
|
|
|
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) |
|
if os.path.exists(extraction_folder): |
|
raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.") |
|
|
|
progress(0.1, desc=f"Preparing to download '{dir_name}'...") |
|
zip_name = urlparse(url).path.split('/')[-1] |
|
if 'pixeldrain.com' in url: |
|
zip_name = os.path.basename(zip_name) |
|
url = f'https://pixeldrain.com/api/file/{zip_name}' |
|
|
|
zip_path = os.path.join(OUTPUT_DIR, zip_name) |
|
progress(0.2, desc="Downloading model...") |
|
response = requests.get(url, stream=True, timeout=600) |
|
response.raise_for_status() |
|
|
|
total_size = int(response.headers.get('content-length', 0)) |
|
downloaded = 0 |
|
with open(zip_path, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
f.write(chunk) |
|
downloaded += len(chunk) |
|
if total_size: |
|
progress(0.2 + 0.6 * (downloaded / total_size), desc="Downloading...") |
|
|
|
progress(0.8, desc="Extracting model...") |
|
extract_zip(extraction_folder, zip_path, progress) |
|
progress(1.0, desc="Download completed") |
|
return f"Model '{dir_name}' successfully downloaded and extracted!", "Model download completed." |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"HTTP error during download: {e}\n{format_exc()}") |
|
shutil.rmtree(extraction_folder, ignore_errors=True) |
|
raise gr.Error(f"Failed to download model: {e}") |
|
except Exception as e: |
|
logger.error(f"Download failed: {e}\n{format_exc()}") |
|
shutil.rmtree(extraction_folder, ignore_errors=True) |
|
raise gr.Error(f"Failed to download model: {e}") |
|
|
|
def upload_local_model(zip_file, dir_name, progress=gr.Progress()): |
|
"""Upload and extract a local model zip file.""" |
|
try: |
|
dir_name = sanitize_model_name(dir_name) |
|
if not zip_file: |
|
raise gr.Error("No file uploaded. Please select a zip file.") |
|
|
|
zip_path = zip_file.name |
|
if not os.path.exists(zip_path): |
|
raise gr.Error("Uploaded file not found. Please try again.") |
|
if os.path.getsize(zip_path) > CONFIG['max_upload_size_mb'] * 1024 * 1024: |
|
raise gr.Error(f"File size exceeds {CONFIG['max_upload_size_mb']}MB limit.") |
|
|
|
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) |
|
if os.path.exists(extraction_folder): |
|
raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.") |
|
|
|
progress(0.5, desc="Processing uploaded file...") |
|
extract_zip(extraction_folder, zip_path, progress) |
|
return f"Model '{dir_name}' successfully uploaded and extracted!", "Model upload completed." |
|
except Exception as e: |
|
logger.error(f"Upload failed: {e}\n{format_exc()}") |
|
shutil.rmtree(extraction_folder, ignore_errors=True) |
|
raise gr.Error(f"Failed to upload model: {e}") |
|
|
|
def delete_model(model_name): |
|
"""Delete a model directory.""" |
|
try: |
|
if not model_name: |
|
raise gr.Error("No model selected. Please choose a model to delete.") |
|
model_path = os.path.join(RVC_MODELS_DIR, model_name) |
|
if not os.path.exists(model_path): |
|
raise gr.Error(f"Model '{model_name}' not found.") |
|
|
|
shutil.rmtree(model_path) |
|
return f"Model '{model_name}' deleted successfully!", "Model deletion completed." |
|
except OSError as e: |
|
logger.error(f"Failed to delete model: {e}\n{format_exc()}") |
|
raise gr.Error(f"Failed to delete model: {e}") |
|
|
|
def swap_visibility(): |
|
"""Toggle visibility of YouTube link and file upload columns.""" |
|
return gr.update(visible=True), gr.update(visible=False), gr.update(value=''), gr.update(value=None) |
|
|
|
def process_file_upload(file): |
|
"""Handle file upload and update UI.""" |
|
try: |
|
if not file: |
|
raise gr.Error("No file uploaded. Please select an audio file.") |
|
file_path = file.name |
|
if os.path.splitext(file_path)[1].lower() not in AUDIO_EXTS: |
|
raise gr.Error(f"Unsupported file format. Supported formats: {', '.join(AUDIO_EXTS)}") |
|
return file_path, gr.update(value=file_path) |
|
except Exception as e: |
|
logger.error(f"File upload processing failed: {e}\n{format_exc()}") |
|
raise gr.Error(f"Failed to process uploaded file: {e}") |
|
|
|
def show_hop_slider(pitch_detection_algo): |
|
"""Show/hide crepe hop length slider based on pitch detection algorithm.""" |
|
return gr.update(visible=pitch_detection_algo == 'crepe') |
|
|
|
async def run_async_pipeline(pipeline, **kwargs): |
|
"""Run an async pipeline.""" |
|
try: |
|
logger.debug(f"Running pipeline with kwargs: {kwargs}") |
|
return await pipeline(**kwargs) |
|
except Exception as e: |
|
logger.error(f"Pipeline execution failed: {e}\n{format_exc()}") |
|
raise |
|
|
|
async def generate_switch( |
|
song_input, rvc_model, pitch, keep_files, main_gain, |
|
backup_gain, inst_gain, index_rate, filter_radius, rms_mix_rate, |
|
f0_method, crepe_hop_length, protect, output_format, vocal_only, |
|
progress=gr.Progress() |
|
): |
|
"""Run the appropriate pipeline based on vocal_only flag.""" |
|
try: |
|
logger.info("Starting generate_switch with inputs: " |
|
f"song_input={song_input}, rvc_model={rvc_model}, vocal_only={vocal_only}") |
|
|
|
|
|
if not song_input: |
|
raise gr.Error("Song input is required. Provide a YouTube link or file path.") |
|
if not rvc_model: |
|
raise gr.Error("Voice model is required. Select a model from the dropdown.") |
|
|
|
|
|
pipeline = vocal_cover_pipeline if vocal_only else song_cover_pipeline |
|
logger.info(f"Selected pipeline: {'vocal_cover_pipeline' if vocal_only else 'song_cover_pipeline'}") |
|
|
|
|
|
song_input = validate_file_path(song_input) if os.path.exists(song_input) else song_input |
|
|
|
|
|
progress(0.1, desc="Initializing conversion...") |
|
result = await run_async_pipeline( |
|
pipeline, |
|
song_input=song_input, |
|
voice_model=rvc_model, |
|
pitch_change=pitch, |
|
keep_files=keep_files, |
|
main_gain=main_gain, |
|
backup_gain=backup_gain, |
|
inst_gain=inst_gain, |
|
index_rate=index_rate, |
|
filter_radius=filter_radius, |
|
rms_mix_rate=rms_mix_rate, |
|
f0_method=f0_method, |
|
crepe_hop_length=crepe_hop_length, |
|
protect=protect, |
|
output_format=output_format |
|
) |
|
progress(1.0, desc="Conversion completed") |
|
logger.info("Pipeline execution completed successfully") |
|
return result, gr.Info("Conversion completed successfully.") |
|
except yt_dlp.utils.DownloadError as e: |
|
logger.error(f"YouTube download failed: {e}\n{format_exc()}") |
|
raise gr.Error(f"Failed to download audio from YouTube. Check the URL or cookies file: {e}") |
|
except Exception as e: |
|
logger.error(f"Conversion failed: {e}\n{format_exc()}") |
|
raise gr.Error(f"Failed to process conversion: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|