Spaces:
Running
Running
""" | |
Main entry point for the Audio Translation Web Application | |
Handles file upload, processing pipeline, and UI rendering | |
""" | |
import logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("app.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
import streamlit as st | |
import os | |
import time | |
import subprocess | |
from utils.stt import transcribe_audio | |
from utils.translation import translate_text | |
from utils.tts import get_tts_engine, generate_speech | |
# Initialize environment configurations | |
os.makedirs("temp/uploads", exist_ok=True) | |
os.makedirs("temp/outputs", exist_ok=True) | |
def configure_page(): | |
"""Set up Streamlit page configuration""" | |
logger.info("Configuring Streamlit page") | |
st.set_page_config( | |
page_title="Audio Translator", | |
page_icon="π§", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
st.markdown(""" | |
<style> | |
.reportview-container {margin-top: -2em;} | |
#MainMenu {visibility: hidden;} | |
.stDeployButton {display:none;} | |
.stAlert {padding: 20px !important;} | |
</style> | |
""", unsafe_allow_html=True) | |
def handle_file_processing(upload_path): | |
""" | |
Execute the complete processing pipeline: | |
1. Speech-to-Text (STT) | |
2. Machine Translation | |
3. Text-to-Speech (TTS) | |
""" | |
logger.info(f"Starting processing for: {upload_path}") | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
try: | |
# STT Phase | |
logger.info("Beginning STT processing") | |
status_text.markdown("π **Performing Speech Recognition...**") | |
with st.spinner("Initializing Whisper model..."): | |
english_text = transcribe_audio(upload_path) | |
progress_bar.progress(30) | |
logger.info(f"STT completed. Text length: {len(english_text)} characters") | |
# Translation Phase | |
logger.info("Beginning translation") | |
status_text.markdown("π **Translating Content...**") | |
with st.spinner("Loading translation model..."): | |
chinese_text = translate_text(english_text) | |
progress_bar.progress(60) | |
logger.info(f"Translation completed. Translated length: {len(chinese_text)} characters") | |
# TTS Phase | |
logger.info("Beginning TTS generation") | |
status_text.markdown("π΅ **Generating Chinese Speech...**") | |
# Initialize TTS engine with appropriate language code for Chinese | |
engine = get_tts_engine(lang_code='z') # 'z' for Mandarin Chinese | |
# Generate speech and get the file path | |
output_path = engine.generate_speech(chinese_text, voice="zf_xiaobei") | |
progress_bar.progress(100) | |
logger.info(f"TTS completed. Output file: {output_path}") | |
# Store the text for streaming playback | |
st.session_state.current_text = chinese_text | |
status_text.success("β Processing Complete!") | |
return english_text, chinese_text, output_path | |
except Exception as e: | |
logger.error(f"Processing failed: {str(e)}", exc_info=True) | |
status_text.error(f"β Processing Failed: {str(e)}") | |
st.exception(e) | |
raise | |
def render_results(english_text, chinese_text, output_path): | |
"""Display processing results in organized columns""" | |
logger.info("Rendering results") | |
st.divider() | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
st.subheader("Recognition Results") | |
st.code(english_text, language="text") | |
st.subheader("Translation Results") | |
st.code(chinese_text, language="text") | |
with col2: | |
st.subheader("Audio Output") | |
# Standard audio player for the full file | |
st.audio(output_path) | |
# Download button | |
with open(output_path, "rb") as f: | |
st.download_button( | |
label="Download Audio", | |
data=f, | |
file_name="translated_audio.wav", | |
mime="audio/wav" | |
) | |
# Streaming playback controls | |
st.subheader("Streaming Playback") | |
if st.button("Stream Audio"): | |
engine = get_tts_engine(lang_code='z') | |
streaming_placeholder = st.empty() | |
# Stream the audio in chunks | |
for sample_rate, audio_chunk in engine.generate_speech_stream( | |
chinese_text, | |
voice="zf_xiaobei" | |
): | |
# Create a temporary file for each chunk | |
temp_chunk_path = f"temp/outputs/chunk_{time.time()}.wav" | |
import soundfile as sf | |
sf.write(temp_chunk_path, audio_chunk, sample_rate) | |
# Play the chunk | |
with streaming_placeholder: | |
st.audio(temp_chunk_path, sample_rate=sample_rate) | |
# Clean up the temporary chunk file | |
os.remove(temp_chunk_path) | |
def initialize_session_state(): | |
"""Initialize session state variables""" | |
if 'current_text' not in st.session_state: | |
st.session_state.current_text = None | |
def main(): | |
"""Main application workflow""" | |
logger.info("Starting application") | |
configure_page() | |
initialize_session_state() | |
st.title("π§ High-Quality Audio Translation System") | |
st.markdown("Upload English Audio β Get Chinese Speech Output") | |
# Voice selection in sidebar | |
st.sidebar.header("TTS Settings") | |
voice_options = { | |
"Xiaobei (Female)": "zf_xiaobei", | |
"Yunjian (Male)": "zm_yunjian", | |
} | |
selected_voice = st.sidebar.selectbox( | |
"Select Voice", | |
list(voice_options.keys()), | |
format_func=lambda x: x | |
) | |
speed = st.sidebar.slider("Speech Speed", 0.5, 2.0, 1.0, 0.1) | |
uploaded_file = st.file_uploader( | |
"Select Audio File (MP3/WAV)", | |
type=["mp3", "wav"], | |
accept_multiple_files=False | |
) | |
if uploaded_file: | |
logger.info(f"File uploaded: {uploaded_file.name}") | |
upload_path = os.path.join("temp/uploads", uploaded_file.name) | |
with open(upload_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
results = handle_file_processing(upload_path) | |
if results: | |
render_results(*results) | |
if __name__ == "__main__": | |
main() |