Spaces:
Runtime error
Runtime error
""" | |
app.py β unified FastAPI backend | |
* /voiceβconvert β RVC voice conversion | |
* /uvr-remove β UVR vocalΒ / instrumental separation | |
Run: | |
uvicorn app:app --host 0.0.0.0 --port 8000 | |
""" | |
import asyncio | |
import mimetypes | |
import shutil | |
import subprocess | |
import tempfile | |
import uuid | |
import zipfile | |
from pathlib import Path | |
from typing import List | |
import httpx | |
from fastapi import BackgroundTasks, FastAPI, HTTPException | |
from fastapi.responses import FileResponse | |
from pydantic import BaseModel, HttpUrl, conint | |
from fastapi.responses import JSONResponse | |
import base64 | |
# βββββββββββββββ RVC IMPORT (lazy singleton) ββββββββββββββββββββββββββββββββ | |
from rvc_cli import import_voice_converter # change if your module is named differently | |
converter = import_voice_converter() | |
# βββββββββββββββ FASTAPI APP ββββββββββββββββββββββββββββββββββββββββββββββββ | |
app = FastAPI( | |
title="AudioΒ AIΒ Backend", | |
version="2.0.0", | |
description="Voice conversion (RVC)Β + vocal removal (UVR)", | |
) | |
# βββββββββββββββββββββββ COMMON HELPERS βββββββββββββββββββββββββββββββββββββ | |
async def _download( | |
url: str, | |
dest: Path, | |
*, | |
retries: int = 3, | |
chunk_size: int = 1 << 18, # 256Β KiB | |
connect_timeout: float = 10.0, | |
read_timeout: float = 10.0, | |
) -> None: | |
""" | |
Robustly stream *url* β *dest*. | |
β’ streams in chunks (constant memory) | |
β’ follows redirects | |
β’ retries (networkΒ / timeout) with exponential backβoff | |
""" | |
dest.parent.mkdir(parents=True, exist_ok=True) | |
backoff = 1.0 | |
for attempt in range(1, retries + 1): | |
try: | |
timeout_cfg = httpx.Timeout(connect_timeout, read=read_timeout) | |
async with httpx.AsyncClient(timeout=timeout_cfg, follow_redirects=True) as client: | |
async with client.stream("GET", url) as resp: | |
if resp.status_code >= 400: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Upstream returned {resp.status_code} for {url}", | |
) | |
with dest.open("wb") as fp: | |
async for chunk in resp.aiter_bytes(chunk_size): | |
fp.write(chunk) | |
return # success | |
except (httpx.TimeoutException, httpx.TransportError) as exc: | |
if attempt == retries: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Failed to fetch {url} after {retries} attempts: {exc}", | |
) | |
await asyncio.sleep(backoff) | |
backoff *= 2 # exponential backβoff | |
def _ensure_wav(src: Path, work_dir: Path) -> Path: | |
"""If *src* isnβt WAV, transcode with FFmpeg β 48Β kHzΒ stereo WAV.""" | |
if src.suffix.lower() == ".wav": | |
return src | |
ctype, _ = mimetypes.guess_type(src.name) | |
if not (ctype or "").startswith("audio"): | |
raise HTTPException(status_code=400, detail="Input is not an audio file") | |
dst = work_dir / f"{src.stem}_converted.wav" | |
cmd = ["ffmpeg", "-y", "-i", str(src), "-ar", "48000", "-ac", "2", str(dst)] | |
try: | |
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
raise HTTPException( | |
status_code=500, | |
detail="FFmpeg failed or is missing on the server.", | |
) | |
return dst | |
# ββββββββββββββββββββββββ RVC VOICEβCONVERT βββββββββββββββββββββββββββββββββ | |
class VoiceConversionRequest(BaseModel): | |
pitch: conint(ge=-24, le=24) | |
input_url: HttpUrl # audio (wav/mp3/flacβ¦) | |
model_url: HttpUrl # ZIP that holds *.pth & *.index | |
def _voice_convert( | |
wav_in: Path, wav_out: Path, pth_file: Path, index_file: Path, pitch: int | |
) -> None: | |
"""Blocking call into the RVC VoiceConverter.""" | |
converter.convert_audio( | |
audio_input_path=str(wav_in), | |
audio_output_path=str(wav_out), | |
model_path=str(pth_file), | |
index_path=str(index_file), | |
pitch=pitch, | |
filter_radius=3, | |
index_rate=0.3, | |
volume_envelope=1.0, | |
protect=0.33, | |
hop_length=128, | |
f0_method="rmvpe", | |
split_audio=False, | |
export_format="WAV", | |
embedder_model="contentvec", | |
sid=0, | |
) | |
# βββ 2. VOICEβCONVERT ENDPOINT βββββββββββββββββββββββββββββββββββββββββββββ | |
import zipfile, itertools | |
async def voice_convert(req: VoiceConversionRequest, background: BackgroundTasks): | |
tmp = Path(tempfile.mkdtemp(prefix="rvc_")) | |
background.add_task(shutil.rmtree, tmp, ignore_errors=True) | |
# 2βa. download audio & model ZIP | |
wav_src = tmp / Path(req.input_url.path).name | |
model_zip = tmp / "model.zip" | |
await asyncio.gather( | |
_download(str(req.input_url), wav_src), | |
_download(str(req.model_url), model_zip), | |
) | |
# 2βb. extract ZIP (nested folders ok) | |
extract_dir = tmp / "model" | |
extract_dir.mkdir(exist_ok=True) | |
try: | |
with zipfile.ZipFile(model_zip) as zf: | |
zf.extractall(extract_dir) | |
# locate first *.pth and *.index anywhere in the tree | |
pth_path = next(itertools.chain(extract_dir.rglob("*.pth")), None) | |
index_path = next(itertools.chain(extract_dir.rglob("*.index")), None) | |
if not pth_path or not index_path: | |
raise HTTPException(status_code=400, detail="ZIP does not contain .pth and .index") | |
except zipfile.BadZipFile: | |
raise HTTPException(status_code=400, detail="Uploaded model is not a valid ZIP") | |
# 2βc. make sure input is WAV | |
wav_for_rvc = _ensure_wav(wav_src, tmp) | |
out_wav = tmp / f"{wav_for_rvc.stem}_output.wav" | |
# 2βd. run conversion in a worker thread | |
try: | |
await asyncio.to_thread(_voice_convert, wav_for_rvc, out_wav, pth_path, index_path, req.pitch) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Conversion failed: {e}") | |
# 2βe. schedule cleanup of every artefact | |
for p in (wav_src, model_zip, out_wav, pth_path, index_path): | |
background.add_task(p.unlink, missing_ok=True) | |
background.add_task(shutil.rmtree, extract_dir, ignore_errors=True) | |
return FileResponse( | |
path=out_wav, | |
media_type="audio/wav", | |
filename=f"{uuid.uuid4().hex}.wav", | |
background=background, | |
) | |
# ββββββββββββββββββββββββββ UVR VOCALΒ REMOVAL βββββββββββββββββββββββββββββββ | |
class UVRRequest(BaseModel): | |
audio_url: HttpUrl | |
model_filename: str # e.g. "2_HP-UVR.pth" | |
# ββββββββββββββββ patched helper βββββββββββββββββββββββββββββββββββββββββββ | |
def _uvr_separate(audio_path: Path, model_filename: str, out_dir: Path) -> list[Path]: | |
from uvr.separator import Separator | |
sep = Separator( | |
model_file_dir="uvr/tmp/audio-separator-models/", | |
output_dir=str(out_dir), | |
output_format="MP3", | |
normalization_threshold=0.9, | |
) | |
sep.load_model(model_filename=model_filename) | |
raw_paths: list[str] = sep.separate(str(audio_path)) | |
# --- NEW: make sure every path is absolute & exists -------------------- | |
abs_paths: list[Path] = [] | |
for p in raw_paths: | |
p_path = Path(p) | |
if not p_path.is_absolute(): | |
p_path = out_dir / p_path # <β key fix | |
p_path = p_path.resolve() | |
if not p_path.exists(): | |
raise RuntimeError(f"UVR reported missing file: {p_path}") | |
abs_paths.append(p_path) | |
# ---------------------------------------------------------------------- | |
return abs_paths | |
async def uvr_remove(req: UVRRequest, background: BackgroundTasks): | |
tmp = Path(tempfile.mkdtemp(prefix="uvr_")) | |
background.add_task(shutil.rmtree, tmp, ignore_errors=True) | |
# 1. download the audio to be separated | |
src = tmp / Path(req.audio_url.path).name | |
await _download(str(req.audio_url), src) | |
# 2. call UVR | |
try: | |
stems = await asyncio.to_thread(_uvr_separate, src, req.model_filename, tmp) | |
except FileNotFoundError: | |
raise HTTPException(status_code=400, detail="Model file not found.") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"UVR failed: {e}") | |
if not stems or len(stems) < 2: | |
raise HTTPException(status_code=500, detail="UVR did not produce two stems.") | |
# 3. find which stem is vocals / instrumental (UVR naming convention) | |
vocals_path = next((p for p in stems if "vocal" in p.stem.lower()), stems[0]) | |
inst_path = next((p for p in stems if "instrumental" in p.stem.lower()), stems[1]) | |
def _b64(path: Path) -> str: | |
"""read file and baseβ64 encode β str (ascii)""" | |
return base64.b64encode(path.read_bytes()).decode("ascii") | |
# 4. encode and build response | |
payload = { | |
"vocals": { | |
"filename": vocals_path.name, | |
"base64_wav": _b64(vocals_path), | |
}, | |
"instrumental": { | |
"filename": inst_path.name, | |
"base64_wav": _b64(inst_path), | |
}, | |
} | |
# 5. clean up temp artefacts | |
for p in stems + [src]: | |
background.add_task(p.unlink, missing_ok=True) | |
return JSONResponse(content=payload) |