neorvc / tabs /typing_extra.py
NeoPy's picture
Upload folder using huggingface_hub
1c7d911 verified
import argparse
import logging
import os
import sys
import shutil
import zipfile
import re
import yt_dlp
from pathlib import Path
from urllib.parse import urlparse
from functools import lru_cache
import gradio as gr
import requests
from traceback import format_exc
import asyncio
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(), logging.FileHandler('neorvc.log')]
)
logger = logging.getLogger(__name__)
# Base directory setup
BASE_DIR = os.getcwd()
sys.path.append(BASE_DIR)
try:
from neorvc.main_cli import song_cover_pipeline, vocal_cover_pipeline
from rvc.rvc_cli import run_prerequisites_script
except ImportError as e:
logger.error(f"Failed to import pipeline functions: {e}")
raise gr.Error(f"Failed to load required modules. Please check installation: {e}")
# Run prerequisites
try:
run_prerequisites_script(
models=True,
exe=False,
)
except Exception as e:
logger.error(f"Error running prerequisites: {e}")
raise gr.Error(f"Failed to initialize prerequisites: {e}")
# Directory configuration
CONFIG = {
"models_dir": "logs",
"output_dir": "song_output",
"max_upload_size_mb": 500
}
RVC_MODELS_DIR = os.path.join(BASE_DIR, CONFIG['models_dir'])
OUTPUT_DIR = os.path.join(BASE_DIR, CONFIG['output_dir'])
ALLOWED_DIR = os.path.abspath(BASE_DIR)
AUDIO_EXTS = ['.mp3', '.wav', '.flac', '.ogg', '.m4a']
# Ensure directories exist
try:
os.makedirs(RVC_MODELS_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
except OSError as e:
logger.error(f"Error creating directories: {e}")
raise gr.Error(f"Failed to create required directories: {e}")
@lru_cache(maxsize=1)
def get_current_models(models_dir):
"""Retrieve list of model directories, excluding specific items."""
try:
models_list = [item for item in os.listdir(models_dir) if item != 'mute' and os.path.isdir(os.path.join(models_dir, item))]
return sorted(models_list)
except OSError as e:
logger.error(f"Error accessing models directory: {e}\n{format_exc()}")
raise gr.Error(f"Failed to list models. Check directory permissions: {e}")
def update_models_list():
"""Update the dropdown list of available models."""
try:
get_current_models.cache_clear()
models = get_current_models(RVC_MODELS_DIR)
if not models:
return gr.update(choices=[], value=None), "No models found in the directory."
return gr.update(choices=models, value=None), "Model list refreshed successfully."
except Exception as e:
logger.error(f"Error updating models list: {e}\n{format_exc()}")
return gr.update(choices=[]), f"Failed to refresh models: {e}"
def sanitize_model_name(dir_name):
"""Sanitize model name to prevent invalid characters."""
if not dir_name:
raise gr.Error("Model name cannot be empty.")
if not re.match(r'^[a-zA-Z0-9_-]+$', dir_name):
raise gr.Error("Invalid model name. Use alphanumeric characters, underscores, or hyphens only.")
return dir_name
def validate_file_path(file_path):
"""Ensure file path is within allowed directory."""
try:
file_path = os.path.abspath(file_path)
if not os.path.commonpath([file_path, ALLOWED_DIR]).startswith(ALLOWED_DIR):
raise gr.Error("File path is outside allowed directory.")
return file_path
except Exception as e:
logger.error(f"Invalid file path: {e}\n{format_exc()}")
raise gr.Error(f"Invalid file path: {e}")
def extract_zip(extraction_folder, zip_path, progress=gr.Progress()):
"""Extract zip file and organize model files."""
try:
extraction_folder = validate_file_path(extraction_folder)
zip_path = validate_file_path(zip_path)
os.makedirs(extraction_folder, exist_ok=True)
progress(0.2, desc="Extracting zip file...")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extraction_folder)
os.remove(zip_path)
except (zipfile.BadZipFile, OSError) as e:
shutil.rmtree(extraction_folder, ignore_errors=True)
logger.error(f"Error extracting zip: {e}\n{format_exc()}")
raise gr.Error(f"Failed to extract zip file. Ensure it's a valid zip: {e}")
index_filepath = model_filepath = None
try:
for root, _, files in os.walk(extraction_folder):
for name in files:
file_path = os.path.join(root, name)
try:
if name.endswith('.index') and os.path.getsize(file_path) > 1024 * 100:
index_filepath = file_path
elif name.endswith('.pth') and os.path.getsize(file_path) > 1024 * 1024 * 40:
model_filepath = file_path
except OSError as sub_e:
logger.warning(f"Error accessing file {file_path}: {sub_e}")
continue
except Exception as e:
logger.error(f"Error scanning extracted files: {e}\n{format_exc()}")
shutil.rmtree(extraction_folder, ignore_errors=True)
raise gr.Error(f"Failed to process extracted files: {e}")
if not model_filepath:
shutil.rmtree(extraction_folder, ignore_errors=True)
raise gr.Error(f"No valid .pth model file found in {extraction_folder}. Ensure a model file (>40MB) is included.")
try:
for filepath in (model_filepath, index_filepath):
if filepath:
new_path = os.path.join(extraction_folder, os.path.basename(filepath))
if filepath != new_path:
os.rename(filepath, new_path)
for item in os.listdir(extraction_folder):
item_path = os.path.join(extraction_folder, item)
if os.path.isdir(item_path):
shutil.rmtree(item_path, ignore_errors=True)
except OSError as e:
logger.error(f"Error organizing extracted files: {e}\n{format_exc()}")
shutil.rmtree(extraction_folder, ignore_errors=True)
raise gr.Error(f"Failed to organize model files: {e}")
progress(1.0, desc="Zip extraction completed")
return f"Model extracted to {extraction_folder}"
def download_online_model(url, dir_name, progress=gr.Progress()):
"""Download and extract a model from a URL synchronously."""
try:
dir_name = sanitize_model_name(dir_name)
if not url:
raise gr.Error("URL is required.")
if not url.startswith(('http://', 'https://')):
raise gr.Error("Invalid URL format. Must start with http:// or https://.")
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.")
progress(0.1, desc=f"Preparing to download '{dir_name}'...")
zip_name = urlparse(url).path.split('/')[-1]
if 'pixeldrain.com' in url:
zip_name = os.path.basename(zip_name)
url = f'https://pixeldrain.com/api/file/{zip_name}'
zip_path = os.path.join(OUTPUT_DIR, zip_name)
progress(0.2, desc="Downloading model...")
response = requests.get(url, stream=True, timeout=600)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size:
progress(0.2 + 0.6 * (downloaded / total_size), desc="Downloading...")
progress(0.8, desc="Extracting model...")
extract_zip(extraction_folder, zip_path, progress)
progress(1.0, desc="Download completed")
return f"Model '{dir_name}' successfully downloaded and extracted!", "Model download completed."
except requests.exceptions.RequestException as e:
logger.error(f"HTTP error during download: {e}\n{format_exc()}")
shutil.rmtree(extraction_folder, ignore_errors=True)
raise gr.Error(f"Failed to download model: {e}")
except Exception as e:
logger.error(f"Download failed: {e}\n{format_exc()}")
shutil.rmtree(extraction_folder, ignore_errors=True)
raise gr.Error(f"Failed to download model: {e}")
def upload_local_model(zip_file, dir_name, progress=gr.Progress()):
"""Upload and extract a local model zip file."""
try:
dir_name = sanitize_model_name(dir_name)
if not zip_file:
raise gr.Error("No file uploaded. Please select a zip file.")
zip_path = zip_file.name
if not os.path.exists(zip_path):
raise gr.Error("Uploaded file not found. Please try again.")
if os.path.getsize(zip_path) > CONFIG['max_upload_size_mb'] * 1024 * 1024:
raise gr.Error(f"File size exceeds {CONFIG['max_upload_size_mb']}MB limit.")
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.")
progress(0.5, desc="Processing uploaded file...")
extract_zip(extraction_folder, zip_path, progress)
return f"Model '{dir_name}' successfully uploaded and extracted!", "Model upload completed."
except Exception as e:
logger.error(f"Upload failed: {e}\n{format_exc()}")
shutil.rmtree(extraction_folder, ignore_errors=True)
raise gr.Error(f"Failed to upload model: {e}")
def delete_model(model_name):
"""Delete a model directory."""
try:
if not model_name:
raise gr.Error("No model selected. Please choose a model to delete.")
model_path = os.path.join(RVC_MODELS_DIR, model_name)
if not os.path.exists(model_path):
raise gr.Error(f"Model '{model_name}' not found.")
shutil.rmtree(model_path)
return f"Model '{model_name}' deleted successfully!", "Model deletion completed."
except OSError as e:
logger.error(f"Failed to delete model: {e}\n{format_exc()}")
raise gr.Error(f"Failed to delete model: {e}")
def swap_visibility():
"""Toggle visibility of YouTube link and file upload columns."""
return gr.update(visible=True), gr.update(visible=False), gr.update(value=''), gr.update(value=None)
def process_file_upload(file):
"""Handle file upload and update UI."""
try:
if not file:
raise gr.Error("No file uploaded. Please select an audio file.")
file_path = file.name
if os.path.splitext(file_path)[1].lower() not in AUDIO_EXTS:
raise gr.Error(f"Unsupported file format. Supported formats: {', '.join(AUDIO_EXTS)}")
return file_path, gr.update(value=file_path)
except Exception as e:
logger.error(f"File upload processing failed: {e}\n{format_exc()}")
raise gr.Error(f"Failed to process uploaded file: {e}")
def show_hop_slider(pitch_detection_algo):
"""Show/hide crepe hop length slider based on pitch detection algorithm."""
return gr.update(visible=pitch_detection_algo == 'crepe')
async def run_async_pipeline(pipeline, **kwargs):
"""Run an async pipeline."""
try:
logger.debug(f"Running pipeline with kwargs: {kwargs}")
return await pipeline(**kwargs)
except Exception as e:
logger.error(f"Pipeline execution failed: {e}\n{format_exc()}")
raise
async def generate_switch(
song_input, rvc_model, pitch, keep_files, main_gain,
backup_gain, inst_gain, index_rate, filter_radius, rms_mix_rate,
f0_method, crepe_hop_length, protect, output_format, vocal_only,
progress=gr.Progress()
):
"""Run the appropriate pipeline based on vocal_only flag."""
try:
logger.info("Starting generate_switch with inputs: "
f"song_input={song_input}, rvc_model={rvc_model}, vocal_only={vocal_only}")
# Validate inputs first
if not song_input:
raise gr.Error("Song input is required. Provide a YouTube link or file path.")
if not rvc_model:
raise gr.Error("Voice model is required. Select a model from the dropdown.")
# Select pipeline
pipeline = vocal_cover_pipeline if vocal_only else song_cover_pipeline
logger.info(f"Selected pipeline: {'vocal_cover_pipeline' if vocal_only else 'song_cover_pipeline'}")
# Validate song_input path if it's a local file
song_input = validate_file_path(song_input) if os.path.exists(song_input) else song_input
# Run the pipeline
progress(0.1, desc="Initializing conversion...")
result = await run_async_pipeline(
pipeline,
song_input=song_input,
voice_model=rvc_model,
pitch_change=pitch,
keep_files=keep_files,
main_gain=main_gain,
backup_gain=backup_gain,
inst_gain=inst_gain,
index_rate=index_rate,
filter_radius=filter_radius,
rms_mix_rate=rms_mix_rate,
f0_method=f0_method,
crepe_hop_length=crepe_hop_length,
protect=protect,
output_format=output_format
)
progress(1.0, desc="Conversion completed")
logger.info("Pipeline execution completed successfully")
return result, gr.Info("Conversion completed successfully.")
except yt_dlp.utils.DownloadError as e:
logger.error(f"YouTube download failed: {e}\n{format_exc()}")
raise gr.Error(f"Failed to download audio from YouTube. Check the URL or cookies file: {e}")
except Exception as e:
logger.error(f"Conversion failed: {e}\n{format_exc()}")
raise gr.Error(f"Failed to process conversion: {e}")
# endcode UwU