|
import sounddevice as sd |
|
import webrtcvad |
|
import numpy as np |
|
from scipy.io.wavfile import write |
|
from faster_whisper import WhisperModel |
|
import time |
|
import os |
|
|
|
SAMPLE_RATE = 16000 |
|
FRAME_DURATION = 30 |
|
FRAME_SIZE = int(SAMPLE_RATE * FRAME_DURATION / 1000) |
|
SILENCE_THRESHOLD = 0.5 |
|
MAX_RECORD_SECONDS = 15 |
|
MIN_SPEECH_DURATION = 0.3 |
|
|
|
|
|
print("📥 Loading Whisper model...") |
|
model = WhisperModel("small", device="cpu", compute_type="int8") |
|
|
|
|
|
def record_and_detect(filename="audio.wav"): |
|
vad = webrtcvad.Vad(2) |
|
frames = [] |
|
silence_counter = 0 |
|
speech_detected = False |
|
max_silence_frames = int(SILENCE_THRESHOLD * 1000 / FRAME_DURATION) |
|
|
|
stream = sd.InputStream(samplerate=SAMPLE_RATE, channels=1, dtype='int16', blocksize=FRAME_SIZE) |
|
stream.start() |
|
print("🎙️ 说话开始(说完停顿自动结束)...") |
|
|
|
try: |
|
while True: |
|
frame, _ = stream.read(FRAME_SIZE) |
|
pcm = frame.flatten() |
|
pcm_bytes = pcm.tobytes() |
|
is_speech = vad.is_speech(pcm_bytes, SAMPLE_RATE) |
|
|
|
frames.append((pcm.copy(), is_speech)) |
|
|
|
if is_speech: |
|
silence_counter = 0 |
|
speech_detected = True |
|
else: |
|
silence_counter += 1 |
|
|
|
if speech_detected and silence_counter >= max_silence_frames: |
|
print("🛑 停顿检测完成,结束录音") |
|
break |
|
finally: |
|
stream.stop() |
|
stream.close() |
|
|
|
|
|
cut_index = len(frames) |
|
for i in range(len(frames) - 1, -1, -1): |
|
if frames[i][1]: |
|
cut_index = i + 1 |
|
break |
|
|
|
trimmed_audio = np.concatenate([frames[i][0] for i in range(cut_index)]) |
|
duration = len(trimmed_audio) / SAMPLE_RATE |
|
|
|
if duration < MIN_SPEECH_DURATION: |
|
print("⚠️ 忽略无效短录音") |
|
return None |
|
|
|
write(filename, SAMPLE_RATE, trimmed_audio.astype(np.int16)) |
|
print(f"💾 已保存音频:{filename} (长度: {duration:.2f}s)") |
|
return filename |
|
|
|
|
|
def transcribe(filename): |
|
print("🔍 开始转录...") |
|
t1 = time.time() |
|
segments, info = model.transcribe(filename, beam_size=3) |
|
t2 = time.time() |
|
|
|
print(f"✅ 检测语言: {info.language}") |
|
segment_list = list(segments) |
|
if not segment_list: |
|
print("⚠️ 没识别到语音内容") |
|
else: |
|
print("📄 识别内容:") |
|
for seg in segment_list: |
|
print(f"[{seg.start:.2f}s → {seg.end:.2f}s] {seg.text}") |
|
print(f"⏱️ 转录耗时:{t2 - t1:.2f}s") |
|
|
|
|
|
if __name__ == "__main__": |
|
while True: |
|
audio_file = record_and_detect() |
|
if audio_file: |
|
transcribe(audio_file) |
|
print("\n✅ 等待下一轮语音输入(Ctrl+C退出)...\n") |
|
|