Athspi's picture
Update app.py
33b1fce verified
raw
history blame
5.41 kB
import os
import hashlib
import numpy as np
from flask import Flask, request, jsonify, send_file, send_from_directory
import google.generativeai as genai
from gtts import gTTS, lang
import tempfile
import soundfile as sf
from kokoro import KPipeline
from werkzeug.utils import secure_filename
from flask_cors import CORS
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__, static_folder='static')
CORS(app, supports_credentials=True)
app.config.update(
MAX_CONTENT_LENGTH=100 * 1024 * 1024, # 100MB
SECRET_KEY=os.urandom(24),
SESSION_COOKIE_SAMESITE='Lax'
)
app.wsgi_app = ProxyFix(app.wsgi_app)
# Configure Gemini API
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
# Language configurations
KOKORO_LANGUAGES = {
"American English": "a",
"British English": "b",
"Mandarin Chinese": "z",
"Spanish": "e",
"French": "f",
"Hindi": "h",
"Italian": "i",
"Brazilian Portuguese": "p"
}
GTTS_LANGUAGES = lang.tts_langs()
GTTS_LANGUAGES['ja'] = 'Japanese'
SUPPORTED_LANGUAGES = sorted(
list(set(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values()))
)
@app.route('/')
def serve_index():
return send_from_directory(app.static_folder, 'index.html')
@app.route('/languages')
def get_languages():
return jsonify(SUPPORTED_LANGUAGES)
@app.route('/upload-chunk', methods=['POST'])
def upload_chunk():
try:
file = request.files['file']
chunk_index = int(request.form['chunkIndex'])
total_chunks = int(request.form['totalChunks'])
file_hash = request.form['fileHash']
# Save chunk to temp directory
chunk_dir = os.path.join(tempfile.gettempdir(), file_hash)
os.makedirs(chunk_dir, exist_ok=True)
chunk_path = os.path.join(chunk_dir, f"{chunk_index:04d}")
file.save(chunk_path)
return jsonify({'status': 'success', 'received': chunk_index})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/process-file', methods=['POST'])
def process_file():
try:
file_hash = request.json['fileHash']
target_language = request.json['language']
chunk_dir = os.path.join(tempfile.gettempdir(), file_hash)
# Reassemble file
final_path = os.path.join(tempfile.gettempdir(), file_hash + ".wav")
with open(final_path, 'wb') as output_file:
for chunk_name in sorted(os.listdir(chunk_dir)):
with open(os.path.join(chunk_dir, chunk_name), 'rb') as chunk_file:
output_file.write(chunk_file.read())
# Process file
result = process_audio(final_path, target_language)
# Cleanup
os.remove(final_path)
for f in os.listdir(chunk_dir):
os.remove(os.path.join(chunk_dir, f))
os.rmdir(chunk_dir)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
def process_audio(file_path, target_language):
# Transcribe using Gemini
model = genai.GenerativeModel("gemini-2.0-flash")
uploaded_file = genai.upload_file(path=file_path)
try:
response = model.generate_content(["Transcribe this audio file:", uploaded_file])
transcription = response.text.strip()
# Translate
prompt = f"Translate to {target_language} preserving meaning and cultural nuances. Respond only with the translation:\n\n{transcription}"
response = model.generate_content(prompt)
translated_text = response.text.strip()
# Generate TTS
if target_language in KOKORO_LANGUAGES:
lang_code = KOKORO_LANGUAGES[target_language]
pipeline = KPipeline(lang_code=lang_code)
generator = pipeline(translated_text, voice="af_heart", speed=1)
# Collect all audio segments
audio_segments = []
for _, _, audio in generator:
if audio is not None:
audio_segments.append(audio)
if audio_segments:
audio_data = np.concatenate(audio_segments)
_, temp_output_path = tempfile.mkstemp(suffix=".wav")
sf.write(temp_output_path, audio_data, 24000)
else:
raise ValueError("No audio generated by Kokoro")
else:
# Fallback to gTTS
lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en')
tts = gTTS(translated_text, lang=lang_code)
_, temp_output_path = tempfile.mkstemp(suffix=".mp3")
tts.save(temp_output_path)
return {
'transcription': transcription,
'translation': translated_text,
'audio_url': f'/download/{os.path.basename(temp_output_path)}'
}
finally:
uploaded_file.delete()
@app.route('/download/<filename>')
def download_file(filename):
try:
return send_file(
os.path.join(tempfile.gettempdir(), filename),
mimetype="audio/mpeg",
as_attachment=True,
download_name=f"translated_{filename}"
)
except FileNotFoundError:
return jsonify({'error': 'File not found'}), 404
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)