Spaces:
Running
Running
import os | |
import numpy as np | |
from flask import Flask, request, jsonify, send_file, send_from_directory | |
import google.generativeai as genai | |
from gtts import gTTS, lang | |
import tempfile | |
import soundfile as sf | |
from kokoro import KPipeline | |
from werkzeug.utils import secure_filename | |
from flask_cors import CORS | |
app = Flask(__name__, static_folder='static') | |
CORS(app) | |
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB limit | |
# Configure Gemini API | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
if not GEMINI_API_KEY: | |
raise ValueError("GEMINI_API_KEY environment variable not set") | |
genai.configure(api_key=GEMINI_API_KEY) | |
# Language configurations | |
KOKORO_LANGUAGES = { | |
"American English": "a", | |
"British English": "b", | |
"Mandarin Chinese": "z", | |
"Spanish": "e", | |
"French": "f", | |
"Hindi": "h", | |
"Italian": "i", | |
"Brazilian Portuguese": "p" | |
} | |
GTTS_LANGUAGES = lang.tts_langs() | |
GTTS_LANGUAGES['ja'] = 'Japanese' | |
SUPPORTED_LANGUAGES = sorted( | |
list(set(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values())) | |
) | |
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB Gemini limit | |
CHUNK_SIZE = 20 * 1024 * 1024 # 20MB chunks | |
def process_large_audio(file_path): | |
"""Process large audio files in chunks""" | |
try: | |
file_size = os.path.getsize(file_path) | |
if file_size <= MAX_FILE_SIZE: | |
# Process small files normally | |
uploaded_file = genai.upload_file(file_path) | |
return [uploaded_file] | |
# Split large files into chunks | |
chunks = [] | |
with open(file_path, 'rb') as f: | |
chunk_num = 0 | |
while chunk_data := f.read(CHUNK_SIZE): | |
chunk_path = f"{file_path}_chunk_{chunk_num}" | |
with open(chunk_path, 'wb') as chunk_file: | |
chunk_file.write(chunk_data) | |
chunks.append(genai.upload_file(chunk_path)) | |
chunk_num += 1 | |
return chunks | |
except Exception as e: | |
raise RuntimeError(f"File processing failed: {str(e)}") | |
def cleanup_files(file_path, chunks): | |
"""Cleanup temporary files and uploaded chunks""" | |
try: | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
for chunk in chunks: | |
if os.path.exists(chunk.name): | |
os.remove(chunk.name) | |
chunk.delete() | |
except Exception as e: | |
app.logger.error(f"Cleanup error: {str(e)}") | |
def translate_audio(): | |
temp_path = None | |
uploaded_chunks = [] | |
try: | |
if 'audio' not in request.files: | |
return jsonify({'error': 'No audio file uploaded'}), 400 | |
audio_file = request.files['audio'] | |
target_language = request.form.get('language', 'English') | |
if not audio_file or audio_file.filename == '': | |
return jsonify({'error': 'Invalid audio file'}), 400 | |
# Save to temp file | |
temp_path = os.path.join(tempfile.gettempdir(), secure_filename(audio_file.filename)) | |
audio_file.save(temp_path) | |
# Process file in chunks if needed | |
uploaded_chunks = process_large_audio(temp_path) | |
# Transcribe chunks | |
model = genai.GenerativeModel("gemini-2.0-flash") | |
transcripts = [] | |
for chunk in uploaded_chunks: | |
response = model.generate_content( | |
["Transcribe this audio chunk verbatim. Respond only with the transcription:", chunk] | |
) | |
transcripts.append(response.text.strip()) | |
chunk.delete() | |
transcription = " ".join(transcripts) | |
# Translation | |
prompt = f"Translate to {target_language} preserving meaning:\n\n{transcription}" | |
response = model.generate_content(prompt) | |
translated_text = response.text.strip() | |
# TTS Generation | |
if target_language in KOKORO_LANGUAGES: | |
# Kokoro processing | |
lang_code = KOKORO_LANGUAGES[target_language] | |
pipeline = KPipeline(lang_code=lang_code) | |
generator = pipeline(translated_text, voice="af_heart", speed=1) | |
audio_segments = [] | |
for _, _, audio in generator: | |
if audio is not None: | |
audio_segments.append(audio) | |
if not audio_segments: | |
raise ValueError("No audio generated by Kokoro") | |
audio_data = np.concatenate(audio_segments) | |
_, output_path = tempfile.mkstemp(suffix=".wav") | |
sf.write(output_path, audio_data, 24000) | |
else: | |
# gTTS processing | |
lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en') | |
tts = gTTS(translated_text, lang=lang_code) | |
_, output_path = tempfile.mkstemp(suffix=".mp3") | |
tts.save(output_path) | |
return jsonify({ | |
'transcription': transcription, | |
'translation': translated_text, | |
'audio_url': f'/download/{os.path.basename(output_path)}' | |
}) | |
except Exception as e: | |
app.logger.error(f"Processing error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
finally: | |
cleanup_files(temp_path, uploaded_chunks) | |
def download_file(filename): | |
try: | |
return send_file( | |
os.path.join(tempfile.gettempdir(), filename), | |
mimetype="audio/mpeg", | |
as_attachment=True, | |
download_name=f"translated_{filename}" | |
) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 404 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=5000, debug=True) |