import argparse import logging import os import sys import shutil import zipfile import re import yt_dlp from pathlib import Path from urllib.parse import urlparse from functools import lru_cache import gradio as gr import requests from traceback import format_exc import asyncio # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(), logging.FileHandler('neorvc.log')] ) logger = logging.getLogger(__name__) # Base directory setup BASE_DIR = os.getcwd() sys.path.append(BASE_DIR) try: from neorvc.main_cli import song_cover_pipeline, vocal_cover_pipeline from rvc.rvc_cli import run_prerequisites_script except ImportError as e: logger.error(f"Failed to import pipeline functions: {e}") raise gr.Error(f"Failed to load required modules. Please check installation: {e}") # Run prerequisites try: run_prerequisites_script( models=True, exe=False, ) except Exception as e: logger.error(f"Error running prerequisites: {e}") raise gr.Error(f"Failed to initialize prerequisites: {e}") # Directory configuration CONFIG = { "models_dir": "logs", "output_dir": "song_output", "max_upload_size_mb": 500 } RVC_MODELS_DIR = os.path.join(BASE_DIR, CONFIG['models_dir']) OUTPUT_DIR = os.path.join(BASE_DIR, CONFIG['output_dir']) ALLOWED_DIR = os.path.abspath(BASE_DIR) AUDIO_EXTS = ['.mp3', '.wav', '.flac', '.ogg', '.m4a'] # Ensure directories exist try: os.makedirs(RVC_MODELS_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) except OSError as e: logger.error(f"Error creating directories: {e}") raise gr.Error(f"Failed to create required directories: {e}") @lru_cache(maxsize=1) def get_current_models(models_dir): """Retrieve list of model directories, excluding specific items.""" try: models_list = [item for item in os.listdir(models_dir) if item != 'mute' and os.path.isdir(os.path.join(models_dir, item))] return sorted(models_list) except OSError as e: logger.error(f"Error accessing models directory: {e}\n{format_exc()}") raise gr.Error(f"Failed to list models. Check directory permissions: {e}") def update_models_list(): """Update the dropdown list of available models.""" try: get_current_models.cache_clear() models = get_current_models(RVC_MODELS_DIR) if not models: return gr.update(choices=[], value=None), "No models found in the directory." return gr.update(choices=models, value=None), "Model list refreshed successfully." except Exception as e: logger.error(f"Error updating models list: {e}\n{format_exc()}") return gr.update(choices=[]), f"Failed to refresh models: {e}" def sanitize_model_name(dir_name): """Sanitize model name to prevent invalid characters.""" if not dir_name: raise gr.Error("Model name cannot be empty.") if not re.match(r'^[a-zA-Z0-9_-]+$', dir_name): raise gr.Error("Invalid model name. Use alphanumeric characters, underscores, or hyphens only.") return dir_name def validate_file_path(file_path): """Ensure file path is within allowed directory.""" try: file_path = os.path.abspath(file_path) if not os.path.commonpath([file_path, ALLOWED_DIR]).startswith(ALLOWED_DIR): raise gr.Error("File path is outside allowed directory.") return file_path except Exception as e: logger.error(f"Invalid file path: {e}\n{format_exc()}") raise gr.Error(f"Invalid file path: {e}") def extract_zip(extraction_folder, zip_path, progress=gr.Progress()): """Extract zip file and organize model files.""" try: extraction_folder = validate_file_path(extraction_folder) zip_path = validate_file_path(zip_path) os.makedirs(extraction_folder, exist_ok=True) progress(0.2, desc="Extracting zip file...") with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extraction_folder) os.remove(zip_path) except (zipfile.BadZipFile, OSError) as e: shutil.rmtree(extraction_folder, ignore_errors=True) logger.error(f"Error extracting zip: {e}\n{format_exc()}") raise gr.Error(f"Failed to extract zip file. Ensure it's a valid zip: {e}") index_filepath = model_filepath = None try: for root, _, files in os.walk(extraction_folder): for name in files: file_path = os.path.join(root, name) try: if name.endswith('.index') and os.path.getsize(file_path) > 1024 * 100: index_filepath = file_path elif name.endswith('.pth') and os.path.getsize(file_path) > 1024 * 1024 * 40: model_filepath = file_path except OSError as sub_e: logger.warning(f"Error accessing file {file_path}: {sub_e}") continue except Exception as e: logger.error(f"Error scanning extracted files: {e}\n{format_exc()}") shutil.rmtree(extraction_folder, ignore_errors=True) raise gr.Error(f"Failed to process extracted files: {e}") if not model_filepath: shutil.rmtree(extraction_folder, ignore_errors=True) raise gr.Error(f"No valid .pth model file found in {extraction_folder}. Ensure a model file (>40MB) is included.") try: for filepath in (model_filepath, index_filepath): if filepath: new_path = os.path.join(extraction_folder, os.path.basename(filepath)) if filepath != new_path: os.rename(filepath, new_path) for item in os.listdir(extraction_folder): item_path = os.path.join(extraction_folder, item) if os.path.isdir(item_path): shutil.rmtree(item_path, ignore_errors=True) except OSError as e: logger.error(f"Error organizing extracted files: {e}\n{format_exc()}") shutil.rmtree(extraction_folder, ignore_errors=True) raise gr.Error(f"Failed to organize model files: {e}") progress(1.0, desc="Zip extraction completed") return f"Model extracted to {extraction_folder}" def download_online_model(url, dir_name, progress=gr.Progress()): """Download and extract a model from a URL synchronously.""" try: dir_name = sanitize_model_name(dir_name) if not url: raise gr.Error("URL is required.") if not url.startswith(('http://', 'https://')): raise gr.Error("Invalid URL format. Must start with http:// or https://.") extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) if os.path.exists(extraction_folder): raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.") progress(0.1, desc=f"Preparing to download '{dir_name}'...") zip_name = urlparse(url).path.split('/')[-1] if 'pixeldrain.com' in url: zip_name = os.path.basename(zip_name) url = f'https://pixeldrain.com/api/file/{zip_name}' zip_path = os.path.join(OUTPUT_DIR, zip_name) progress(0.2, desc="Downloading model...") response = requests.get(url, stream=True, timeout=600) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) downloaded = 0 with open(zip_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) downloaded += len(chunk) if total_size: progress(0.2 + 0.6 * (downloaded / total_size), desc="Downloading...") progress(0.8, desc="Extracting model...") extract_zip(extraction_folder, zip_path, progress) progress(1.0, desc="Download completed") return f"Model '{dir_name}' successfully downloaded and extracted!", "Model download completed." except requests.exceptions.RequestException as e: logger.error(f"HTTP error during download: {e}\n{format_exc()}") shutil.rmtree(extraction_folder, ignore_errors=True) raise gr.Error(f"Failed to download model: {e}") except Exception as e: logger.error(f"Download failed: {e}\n{format_exc()}") shutil.rmtree(extraction_folder, ignore_errors=True) raise gr.Error(f"Failed to download model: {e}") def upload_local_model(zip_file, dir_name, progress=gr.Progress()): """Upload and extract a local model zip file.""" try: dir_name = sanitize_model_name(dir_name) if not zip_file: raise gr.Error("No file uploaded. Please select a zip file.") zip_path = zip_file.name if not os.path.exists(zip_path): raise gr.Error("Uploaded file not found. Please try again.") if os.path.getsize(zip_path) > CONFIG['max_upload_size_mb'] * 1024 * 1024: raise gr.Error(f"File size exceeds {CONFIG['max_upload_size_mb']}MB limit.") extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) if os.path.exists(extraction_folder): raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.") progress(0.5, desc="Processing uploaded file...") extract_zip(extraction_folder, zip_path, progress) return f"Model '{dir_name}' successfully uploaded and extracted!", "Model upload completed." except Exception as e: logger.error(f"Upload failed: {e}\n{format_exc()}") shutil.rmtree(extraction_folder, ignore_errors=True) raise gr.Error(f"Failed to upload model: {e}") def delete_model(model_name): """Delete a model directory.""" try: if not model_name: raise gr.Error("No model selected. Please choose a model to delete.") model_path = os.path.join(RVC_MODELS_DIR, model_name) if not os.path.exists(model_path): raise gr.Error(f"Model '{model_name}' not found.") shutil.rmtree(model_path) return f"Model '{model_name}' deleted successfully!", "Model deletion completed." except OSError as e: logger.error(f"Failed to delete model: {e}\n{format_exc()}") raise gr.Error(f"Failed to delete model: {e}") def swap_visibility(): """Toggle visibility of YouTube link and file upload columns.""" return gr.update(visible=True), gr.update(visible=False), gr.update(value=''), gr.update(value=None) def process_file_upload(file): """Handle file upload and update UI.""" try: if not file: raise gr.Error("No file uploaded. Please select an audio file.") file_path = file.name if os.path.splitext(file_path)[1].lower() not in AUDIO_EXTS: raise gr.Error(f"Unsupported file format. Supported formats: {', '.join(AUDIO_EXTS)}") return file_path, gr.update(value=file_path) except Exception as e: logger.error(f"File upload processing failed: {e}\n{format_exc()}") raise gr.Error(f"Failed to process uploaded file: {e}") def show_hop_slider(pitch_detection_algo): """Show/hide crepe hop length slider based on pitch detection algorithm.""" return gr.update(visible=pitch_detection_algo == 'crepe') async def run_async_pipeline(pipeline, **kwargs): """Run an async pipeline.""" try: logger.debug(f"Running pipeline with kwargs: {kwargs}") return await pipeline(**kwargs) except Exception as e: logger.error(f"Pipeline execution failed: {e}\n{format_exc()}") raise async def generate_switch( song_input, rvc_model, pitch, keep_files, main_gain, backup_gain, inst_gain, index_rate, filter_radius, rms_mix_rate, f0_method, crepe_hop_length, protect, output_format, vocal_only, progress=gr.Progress() ): """Run the appropriate pipeline based on vocal_only flag.""" try: logger.info("Starting generate_switch with inputs: " f"song_input={song_input}, rvc_model={rvc_model}, vocal_only={vocal_only}") # Validate inputs first if not song_input: raise gr.Error("Song input is required. Provide a YouTube link or file path.") if not rvc_model: raise gr.Error("Voice model is required. Select a model from the dropdown.") # Select pipeline pipeline = vocal_cover_pipeline if vocal_only else song_cover_pipeline logger.info(f"Selected pipeline: {'vocal_cover_pipeline' if vocal_only else 'song_cover_pipeline'}") # Validate song_input path if it's a local file song_input = validate_file_path(song_input) if os.path.exists(song_input) else song_input # Run the pipeline progress(0.1, desc="Initializing conversion...") result = await run_async_pipeline( pipeline, song_input=song_input, voice_model=rvc_model, pitch_change=pitch, keep_files=keep_files, main_gain=main_gain, backup_gain=backup_gain, inst_gain=inst_gain, index_rate=index_rate, filter_radius=filter_radius, rms_mix_rate=rms_mix_rate, f0_method=f0_method, crepe_hop_length=crepe_hop_length, protect=protect, output_format=output_format ) progress(1.0, desc="Conversion completed") logger.info("Pipeline execution completed successfully") return result, gr.Info("Conversion completed successfully.") except yt_dlp.utils.DownloadError as e: logger.error(f"YouTube download failed: {e}\n{format_exc()}") raise gr.Error(f"Failed to download audio from YouTube. Check the URL or cookies file: {e}") except Exception as e: logger.error(f"Conversion failed: {e}\n{format_exc()}") raise gr.Error(f"Failed to process conversion: {e}") # endcode UwU