Spaces:
Running
Running
File size: 6,475 Bytes
c549dab cd1309d c72d839 c549dab cd1309d c549dab cd1309d c549dab 2d2f2b9 cd1309d c549dab cd1309d c549dab cd1309d c549dab cd1309d 2477bc4 c549dab c72d839 cd1309d 2477bc4 c549dab c72d839 cd1309d 2477bc4 c549dab 34f1262 c549dab c72d839 c549dab cd1309d c549dab cd1309d c72d839 c549dab cd1309d c549dab cd1309d c549dab cd1309d c549dab cd1309d c549dab 34f1262 c549dab 34f1262 c549dab 34f1262 f0248ed c549dab cd1309d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
"""
Main entry point for the Audio Translation Web Application
Handles file upload, processing pipeline, and UI rendering
"""
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
import streamlit as st
import os
import time
import subprocess
from utils.stt import transcribe_audio
from utils.translation import translate_text
from utils.tts import get_tts_engine, generate_speech
# Initialize environment configurations
os.makedirs("temp/uploads", exist_ok=True)
os.makedirs("temp/outputs", exist_ok=True)
def configure_page():
"""Set up Streamlit page configuration"""
logger.info("Configuring Streamlit page")
st.set_page_config(
page_title="Audio Translator",
page_icon="π§",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
<style>
.reportview-container {margin-top: -2em;}
#MainMenu {visibility: hidden;}
.stDeployButton {display:none;}
.stAlert {padding: 20px !important;}
</style>
""", unsafe_allow_html=True)
def handle_file_processing(upload_path):
"""
Execute the complete processing pipeline:
1. Speech-to-Text (STT)
2. Machine Translation
3. Text-to-Speech (TTS)
"""
logger.info(f"Starting processing for: {upload_path}")
progress_bar = st.progress(0)
status_text = st.empty()
try:
# STT Phase
logger.info("Beginning STT processing")
status_text.markdown("π **Performing Speech Recognition...**")
with st.spinner("Initializing Whisper model..."):
english_text = transcribe_audio(upload_path)
progress_bar.progress(30)
logger.info(f"STT completed. Text length: {len(english_text)} characters")
# Translation Phase
logger.info("Beginning translation")
status_text.markdown("π **Translating Content...**")
with st.spinner("Loading translation model..."):
chinese_text = translate_text(english_text)
progress_bar.progress(60)
logger.info(f"Translation completed. Translated length: {len(chinese_text)} characters")
# TTS Phase
logger.info("Beginning TTS generation")
status_text.markdown("π΅ **Generating Chinese Speech...**")
# Initialize TTS engine with appropriate language code for Chinese
engine = get_tts_engine(lang_code='z') # 'z' for Mandarin Chinese
# Generate speech and get the file path
output_path = engine.generate_speech(chinese_text, voice="zf_xiaobei")
progress_bar.progress(100)
logger.info(f"TTS completed. Output file: {output_path}")
# Store the text for streaming playback
st.session_state.current_text = chinese_text
status_text.success("β
Processing Complete!")
return english_text, chinese_text, output_path
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
status_text.error(f"β Processing Failed: {str(e)}")
st.exception(e)
raise
def render_results(english_text, chinese_text, output_path):
"""Display processing results in organized columns"""
logger.info("Rendering results")
st.divider()
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("Recognition Results")
st.code(english_text, language="text")
st.subheader("Translation Results")
st.code(chinese_text, language="text")
with col2:
st.subheader("Audio Output")
# Standard audio player for the full file
st.audio(output_path)
# Download button
with open(output_path, "rb") as f:
st.download_button(
label="Download Audio",
data=f,
file_name="translated_audio.wav",
mime="audio/wav"
)
# Streaming playback controls
st.subheader("Streaming Playback")
if st.button("Stream Audio"):
engine = get_tts_engine(lang_code='z')
streaming_placeholder = st.empty()
# Stream the audio in chunks
for sample_rate, audio_chunk in engine.generate_speech_stream(
chinese_text,
voice="zf_xiaobei"
):
# Create a temporary file for each chunk
temp_chunk_path = f"temp/outputs/chunk_{time.time()}.wav"
import soundfile as sf
sf.write(temp_chunk_path, audio_chunk, sample_rate)
# Play the chunk
with streaming_placeholder:
st.audio(temp_chunk_path, sample_rate=sample_rate)
# Clean up the temporary chunk file
os.remove(temp_chunk_path)
def initialize_session_state():
"""Initialize session state variables"""
if 'current_text' not in st.session_state:
st.session_state.current_text = None
def main():
"""Main application workflow"""
logger.info("Starting application")
configure_page()
initialize_session_state()
st.title("π§ High-Quality Audio Translation System")
st.markdown("Upload English Audio β Get Chinese Speech Output")
# Voice selection in sidebar
st.sidebar.header("TTS Settings")
voice_options = {
"Xiaobei (Female)": "zf_xiaobei",
"Yunjian (Male)": "zm_yunjian",
}
selected_voice = st.sidebar.selectbox(
"Select Voice",
list(voice_options.keys()),
format_func=lambda x: x
)
speed = st.sidebar.slider("Speech Speed", 0.5, 2.0, 1.0, 0.1)
uploaded_file = st.file_uploader(
"Select Audio File (MP3/WAV)",
type=["mp3", "wav"],
accept_multiple_files=False
)
if uploaded_file:
logger.info(f"File uploaded: {uploaded_file.name}")
upload_path = os.path.join("temp/uploads", uploaded_file.name)
with open(upload_path, "wb") as f:
f.write(uploaded_file.getbuffer())
results = handle_file_processing(upload_path)
if results:
render_results(*results)
if __name__ == "__main__":
main() |