Michael Hu
Revert "Update README.md"
c549dab
raw
history blame
6.48 kB
"""
Main entry point for the Audio Translation Web Application
Handles file upload, processing pipeline, and UI rendering
"""
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
import streamlit as st
import os
import time
import subprocess
from utils.stt import transcribe_audio
from utils.translation import translate_text
from utils.tts import get_tts_engine, generate_speech
# Initialize environment configurations
os.makedirs("temp/uploads", exist_ok=True)
os.makedirs("temp/outputs", exist_ok=True)
def configure_page():
"""Set up Streamlit page configuration"""
logger.info("Configuring Streamlit page")
st.set_page_config(
page_title="Audio Translator",
page_icon="🎧",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
<style>
.reportview-container {margin-top: -2em;}
#MainMenu {visibility: hidden;}
.stDeployButton {display:none;}
.stAlert {padding: 20px !important;}
</style>
""", unsafe_allow_html=True)
def handle_file_processing(upload_path):
"""
Execute the complete processing pipeline:
1. Speech-to-Text (STT)
2. Machine Translation
3. Text-to-Speech (TTS)
"""
logger.info(f"Starting processing for: {upload_path}")
progress_bar = st.progress(0)
status_text = st.empty()
try:
# STT Phase
logger.info("Beginning STT processing")
status_text.markdown("πŸ” **Performing Speech Recognition...**")
with st.spinner("Initializing Whisper model..."):
english_text = transcribe_audio(upload_path)
progress_bar.progress(30)
logger.info(f"STT completed. Text length: {len(english_text)} characters")
# Translation Phase
logger.info("Beginning translation")
status_text.markdown("🌐 **Translating Content...**")
with st.spinner("Loading translation model..."):
chinese_text = translate_text(english_text)
progress_bar.progress(60)
logger.info(f"Translation completed. Translated length: {len(chinese_text)} characters")
# TTS Phase
logger.info("Beginning TTS generation")
status_text.markdown("🎡 **Generating Chinese Speech...**")
# Initialize TTS engine with appropriate language code for Chinese
engine = get_tts_engine(lang_code='z') # 'z' for Mandarin Chinese
# Generate speech and get the file path
output_path = engine.generate_speech(chinese_text, voice="zf_xiaobei")
progress_bar.progress(100)
logger.info(f"TTS completed. Output file: {output_path}")
# Store the text for streaming playback
st.session_state.current_text = chinese_text
status_text.success("βœ… Processing Complete!")
return english_text, chinese_text, output_path
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
status_text.error(f"❌ Processing Failed: {str(e)}")
st.exception(e)
raise
def render_results(english_text, chinese_text, output_path):
"""Display processing results in organized columns"""
logger.info("Rendering results")
st.divider()
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("Recognition Results")
st.code(english_text, language="text")
st.subheader("Translation Results")
st.code(chinese_text, language="text")
with col2:
st.subheader("Audio Output")
# Standard audio player for the full file
st.audio(output_path)
# Download button
with open(output_path, "rb") as f:
st.download_button(
label="Download Audio",
data=f,
file_name="translated_audio.wav",
mime="audio/wav"
)
# Streaming playback controls
st.subheader("Streaming Playback")
if st.button("Stream Audio"):
engine = get_tts_engine(lang_code='z')
streaming_placeholder = st.empty()
# Stream the audio in chunks
for sample_rate, audio_chunk in engine.generate_speech_stream(
chinese_text,
voice="zf_xiaobei"
):
# Create a temporary file for each chunk
temp_chunk_path = f"temp/outputs/chunk_{time.time()}.wav"
import soundfile as sf
sf.write(temp_chunk_path, audio_chunk, sample_rate)
# Play the chunk
with streaming_placeholder:
st.audio(temp_chunk_path, sample_rate=sample_rate)
# Clean up the temporary chunk file
os.remove(temp_chunk_path)
def initialize_session_state():
"""Initialize session state variables"""
if 'current_text' not in st.session_state:
st.session_state.current_text = None
def main():
"""Main application workflow"""
logger.info("Starting application")
configure_page()
initialize_session_state()
st.title("🎧 High-Quality Audio Translation System")
st.markdown("Upload English Audio β†’ Get Chinese Speech Output")
# Voice selection in sidebar
st.sidebar.header("TTS Settings")
voice_options = {
"Xiaobei (Female)": "zf_xiaobei",
"Yunjian (Male)": "zm_yunjian",
}
selected_voice = st.sidebar.selectbox(
"Select Voice",
list(voice_options.keys()),
format_func=lambda x: x
)
speed = st.sidebar.slider("Speech Speed", 0.5, 2.0, 1.0, 0.1)
uploaded_file = st.file_uploader(
"Select Audio File (MP3/WAV)",
type=["mp3", "wav"],
accept_multiple_files=False
)
if uploaded_file:
logger.info(f"File uploaded: {uploaded_file.name}")
upload_path = os.path.join("temp/uploads", uploaded_file.name)
with open(upload_path, "wb") as f:
f.write(uploaded_file.getbuffer())
results = handle_file_processing(upload_path)
if results:
render_results(*results)
if __name__ == "__main__":
main()