HoneyTian's picture
update
027a68c
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
from functools import lru_cache
import json
import logging
from pathlib import Path
import platform
import tempfile
import time
import uuid
import gradio as gr
import librosa
import numpy as np
from scipy.io import wavfile
import log
from project_settings import environment, project_path, log_directory
from toolbox.os.command import Command
from toolbox.age_and_gender.models.audeering import AudeeringModel
log.setup_size_rotating(log_directory=log_directory)
logger = logging.getLogger("main")
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--examples_dir",
default=(project_path / "data/examples").as_posix(),
type=str,
)
args = parser.parse_args()
return args
def save_input_audio(sample_rate: int, signal: np.ndarray) -> str:
temp_audio_dir = Path(tempfile.gettempdir()) / "input_audio"
temp_audio_dir.mkdir(parents=True, exist_ok=True)
filename = temp_audio_dir / f"{uuid.uuid4()}.wav"
filename = filename.as_posix()
wavfile.write(
filename,
sample_rate, signal
)
return filename
def shell(cmd: str):
return Command.popen(cmd)
age_and_gender_model_map = {
"audeering-6-ft":{
"infer_cls": AudeeringModel,
"kwargs": {
"model_path":
(project_path / "pretrained_models/wav2vec2-large-robust-6-ft-age-gender").as_posix()
if platform.system() == "Windows" else "audeering/wav2vec2-large-robust-6-ft-age-gender"
},
"sample_rate": 16000,
},
"audeering-24-ft": {
"infer_cls": AudeeringModel,
"kwargs": {
"model_path":
(project_path / "pretrained_models/wav2vec2-large-robust-24-ft-age-gender").as_posix()
if platform.system() == "Windows" else "audeering/wav2vec2-large-robust-24-ft-age-gender",
},
"sample_rate": 16000,
},
}
@lru_cache(maxsize=3)
def load_get_age_and_gender_model(infer_cls, **kwargs):
infer_engine = infer_cls(**kwargs)
return infer_engine
def when_click_get_age_and_gender_button(audio_t, engine: str):
sample_rate, signal = audio_t
filename = save_input_audio(sample_rate, signal)
logger.info(f"run get_age_and_gender; engine: {engine}.")
infer_engine_param = age_and_gender_model_map.get(engine)
if infer_engine_param is None:
raise gr.Error(f"invalid denoise engine: {engine}.")
try:
infer_cls = infer_engine_param["infer_cls"]
kwargs = infer_engine_param["kwargs"]
sample_rate = infer_engine_param["sample_rate"]
signal, _ = librosa.load(filename, sr=sample_rate)
duration = len(signal) / sample_rate
infer_engine = load_get_age_and_gender_model(infer_cls=infer_cls, **kwargs)
time_begin = time.time()
age_and_gender = infer_engine.__call__(signal, sample_rate)
time_cost = time.time() - time_begin
rtf = time_cost / duration
result = {
**age_and_gender,
"duration": round(duration, 4),
"time_cost": round(time_cost, 4),
"rtf": round(rtf, 4),
}
result = json.dumps(result, ensure_ascii=False, indent=4)
except Exception as e:
raise gr.Error(f"get_age_and_gender failed, error type: {type(e)}, error text: {str(e)}.")
return result
def main():
args = get_args()
# examples
examples_dir = Path(args.examples_dir)
# choices
age_and_gender_model_choices = list(age_and_gender_model_map.keys())
# ui
with gr.Blocks() as blocks:
with gr.Tabs():
with gr.TabItem("age_and_gender"):
with gr.Row():
with gr.Column(variant="panel", scale=5):
ag_audio = gr.Audio(label="audio")
ag_engine = gr.Dropdown(choices=age_and_gender_model_choices, value=age_and_gender_model_choices[0], label="engine")
ag_button = gr.Button(variant="primary")
with gr.Column(variant="panel", scale=5):
ag_output = gr.Text(label="output")
gr.Examples(
examples=[
[filename.as_posix(), age_and_gender_model_choices[0]]
for filename in examples_dir.glob("*.wav")
],
inputs=[ag_audio, ag_engine],
outputs=[ag_output],
fn=when_click_get_age_and_gender_button,
)
ag_button.click(
when_click_get_age_and_gender_button,
inputs=[ag_audio, ag_engine],
outputs=[ag_output],
)
with gr.TabItem("shell"):
shell_text = gr.Textbox(label="cmd")
shell_button = gr.Button("run")
shell_output = gr.Textbox(label="output")
shell_button.click(
shell,
inputs=[shell_text,],
outputs=[shell_output],
)
# http://127.0.0.1:7860/
# http://10.75.27.247:7861/
blocks.queue().launch(
share=False if platform.system() == "Windows" else False,
# server_name="127.0.0.1" if platform.system() == "Windows" else "0.0.0.0",
server_name="0.0.0.0",
server_port=environment.get("port", 7860, dtype=int),
)
return
if __name__ == "__main__":
main()