File size: 3,222 Bytes
b6c45cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# DPTNet_quant_sep.py

import os
import torch
import numpy as np
import torchaudio
from huggingface_hub import hf_hub_download
from . import asteroid_test

torchaudio.set_audio_backend("sox_io")

def get_conf():
    conf_filterbank = {
        'n_filters': 64,
        'kernel_size': 16,
        'stride': 8
    }

    conf_masknet = {
        'in_chan': 64,
        'n_src': 2,
        'out_chan': 64,
        'ff_hid': 256,
        'ff_activation': "relu",
        'norm_type': "gLN",
        'chunk_size': 100,
        'hop_size': 50,
        'n_repeats': 2,
        'mask_act': 'sigmoid',
        'bidirectional': True,
        'dropout': 0
    }
    return conf_filterbank, conf_masknet


def load_dpt_model():
    print('Load Separation Model...')

    # 從環境變數取得 Hugging Face Token
    HF_TOKEN = os.getenv("HF_TOKEN")
    if not HF_TOKEN:
        raise EnvironmentError("環境變數 HF_TOKEN 未設定!請先執行 export HF_TOKEN=xxx")

    # 從 Hugging Face Hub 下載模型權重
    model_path = hf_hub_download(
        repo_id="DeepLearning101/speech-separation",  # ← 替換成你的 repo 名稱
        filename="train_dptnet_aishell_partOverlap_B2_300epoch_quan-int8.p",
        token=HF_TOKEN
    )

    # 取得模型參數
    conf_filterbank, conf_masknet = get_conf()

    # 建立模型架構
    model_class = getattr(asteroid_test, "DPTNet")
    model = model_class(**conf_filterbank, **conf_masknet)

    # 套用量化設定
    model = torch.quantization.quantize_dynamic(
        model,
        {torch.nn.LSTM, torch.nn.Linear},
        dtype=torch.qint8
    )

    # 載入權重(忽略不匹配的 keys)
    state_dict = torch.load(model_path, map_location="cpu")
    model_state_dict = model.state_dict()
    filtered_state_dict = {k: v for k, v in state_dict.items() if k in model_state_dict}
    model.load_state_dict(filtered_state_dict, strict=False)
    model.eval()

    return model


def dpt_sep_process(wav_path, model=None, outfilename=None):
    if model is None:
        model = load_dpt_model()

    x, sr = torchaudio.load(wav_path)
    x = x.cpu()

    with torch.no_grad():
        est_sources = model(x)  # shape: (1, 2, T)

    est_sources = est_sources.squeeze(0)  # shape: (2, T)
    sep_1, sep_2 = est_sources  # 拆成兩個 (T,) 的 tensor

    # 正規化
    max_abs = x[0].abs().max().item()
    sep_1 = sep_1 * max_abs / sep_1.abs().max().item()
    sep_2 = sep_2 * max_abs / sep_2.abs().max().item()

    # 增加 channel 維度,變為 (1, T)
    sep_1 = sep_1.unsqueeze(0)
    sep_2 = sep_2.unsqueeze(0)

    # 儲存結果
    if outfilename is not None:
        torchaudio.save(outfilename.replace('.wav', '_sep1.wav'), sep_1, sr)
        torchaudio.save(outfilename.replace('.wav', '_sep2.wav'), sep_2, sr)
        torchaudio.save(outfilename.replace('.wav', '_mix.wav'), x, sr)
    else:
        torchaudio.save(wav_path.replace('.wav', '_sep1.wav'), sep_1, sr)
        torchaudio.save(wav_path.replace('.wav', '_sep2.wav'), sep_2, sr)


if __name__ == '__main__':
    print("This module should be used via Flask or Gradio.")