darkmaria / app.py
Docfile's picture
Upload 2 files
0cc968c verified
import os
import asyncio
import wave
import tempfile
import logging
import json
import time
from flask import Flask, render_template, request, jsonify, send_file, stream_with_context, Response
from google import genai
import aiohttp
from pydub import AudioSegment
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.secret_key = os.environ.get("SESSION_SECRET", "default-secret-key")
# Configure Gemini API
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
logger.warning("GEMINI_API_KEY not found in environment variables. Using default value for development.")
api_key = "YOUR_API_KEY" # This will be replaced with env var in production
# Define available voices
AVAILABLE_VOICES = [
"Puck", "Charon", "Kore", "Fenrir",
"Aoede", "Leda", "Orus", "Zephyr"
]
language_code="fr-FR"
# Global variable to track generation progress
generation_progress = {
"status": "idle",
"current": 0,
"total": 0,
"message": ""
}
def update_progress(current, total, message):
"""Update the global progress tracker."""
global generation_progress
generation_progress = {
"status": "in_progress" if current < total else "complete",
"current": current,
"total": total,
"message": message
}
def create_async_enumerate(async_iterator):
"""Create an async enumerate function since it's not built-in."""
i = 0
async def async_iter():
nonlocal i
async for item in async_iterator:
yield i, item
i += 1
return async_iter()
async def generate_speech(text, selected_voice):
"""Generate speech from text using Gemini AI."""
try:
client = genai.Client(api_key=api_key)
model = "gemini-2.0-flash-live-001"
# Configure the voice settings
speech_config = genai.types.SpeechConfig(
language_code=language_code,
voice_config=genai.types.VoiceConfig(
prebuilt_voice_config=genai.types.PrebuiltVoiceConfig(
voice_name=selected_voice
)
)
)
config = genai.types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=speech_config
)
# Create a temporary file to store the audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
temp_filename = tmp_file.name
async with client.aio.live.connect(model=model, config=config) as session:
# Open the WAV file for writing
wf = wave.open(temp_filename, "wb")
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(24000)
# Send the text to Gemini
await session.send_client_content(
turns={"role": "user", "parts": [{"text": text}]},
turn_complete=True
)
# Receive the audio data and write it to the file
async for idx, response in create_async_enumerate(session.receive()):
if response.data is not None:
wf.writeframes(response.data)
wf.close()
return temp_filename
except Exception as e:
logger.error(f"Error generating speech: {str(e)}")
raise e
@app.route('/')
def index():
"""Render the main page."""
return render_template('index.html', voices=AVAILABLE_VOICES)
@app.route('/generate', methods=['POST'])
async def generate():
"""Generate speech from text."""
try:
data = request.json
text = data.get('text', '')
voice = data.get('voice', 'Kore') # Default voice
if not text:
return jsonify({"error": "Text is required"}), 400
if voice not in AVAILABLE_VOICES:
return jsonify({"error": "Invalid voice selection"}), 400
# Generate the speech
audio_file = await generate_speech(text, voice)
return jsonify({
"status": "success",
"message": "Audio generated successfully",
"audioUrl": f"/audio/{os.path.basename(audio_file)}"
})
except Exception as e:
logger.error(f"Error in generate endpoint: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/audio/<filename>')
def get_audio(filename):
"""Serve the generated audio file."""
try:
temp_dir = tempfile.gettempdir()
file_path = os.path.join(temp_dir, filename)
if not os.path.exists(file_path):
return jsonify({"error": "Audio file not found"}), 404
return send_file(file_path, mimetype="audio/wav", as_attachment=False)
except Exception as e:
logger.error(f"Error serving audio file: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/generate-podcast', methods=['POST'])
async def generate_podcast_route():
"""Generate a podcast from a scenario."""
try:
scenario = request.json
# Reset progress tracker
global generation_progress
generation_progress = {
"status": "in_progress",
"current": 0,
"total": len(scenario.get('characters', [])),
"message": "Démarrage de la génération..."
}
# Start the background task for podcast generation
# We'll return immediately and let the client poll for progress
asyncio.create_task(generate_podcast_background(scenario))
return jsonify({
"status": "started",
"message": "Génération du podcast commencée. Suivez la progression sur l'interface."
})
except Exception as e:
logger.error(f"Error in generate-podcast endpoint: {str(e)}")
update_progress(0, 0, f"Erreur: {str(e)}")
return jsonify({"error": str(e)}), 500
async def generate_podcast_background(scenario):
"""Generate a podcast in the background."""
try:
# Generate audio for each character
characters = scenario.get('characters', [])
total_characters = len(characters)
update_progress(0, total_characters, f"Préparation du podcast avec {total_characters} personnages...")
audio_segments = []
podcast_filename = None
for idx, character in enumerate(characters):
character_name = character.get('name', 'Unknown')
voice = character.get('voice', 'Kore')
text = character.get('text', '')
update_progress(idx, total_characters, f"Génération de l'audio pour {character_name} ({idx+1}/{total_characters})...")
if voice not in AVAILABLE_VOICES:
logger.warning(f"Voice {voice} not available. Using default voice Kore for {character_name}.")
voice = 'Kore'
# Generate speech for this character
try:
audio_file = await generate_speech(text, voice)
audio_segments.append(audio_file)
except Exception as e:
logger.error(f"Error generating speech for {character_name}: {str(e)}")
update_progress(0, 0, f"Erreur lors de la génération pour {character_name}: {str(e)}")
return
update_progress(total_characters, total_characters, "Assemblage des segments audio...")
# Combine all audio segments into one file
combined = AudioSegment.empty()
for audio_file in audio_segments:
segment = AudioSegment.from_wav(audio_file)
combined += segment
# Add a short silence between segments (500ms)
combined += AudioSegment.silent(duration=500)
# Export the combined audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as output_file:
podcast_filename = output_file.name
combined.export(podcast_filename, format="wav")
update_progress(total_characters + 1, total_characters + 1, f"Podcast généré avec succès! audio:{os.path.basename(podcast_filename)}")
except Exception as e:
logger.error(f"Error in podcast background task: {str(e)}")
update_progress(0, 0, f"Erreur: {str(e)}")
@app.route('/podcast-status')
def podcast_status():
"""Get the current status of the podcast generation."""
global generation_progress
# If status is complete and contains an audioUrl in the message, extract it
if generation_progress["status"] == "complete" and "audio:" in generation_progress["message"]:
message_parts = generation_progress["message"].split("audio:")
if len(message_parts) > 1:
audio_filename = message_parts[1].strip()
return jsonify({
"status": "complete",
"message": message_parts[0].strip(),
"audioUrl": f"/audio/{audio_filename}"
})
# Otherwise just return the current progress
return jsonify(generation_progress)
@app.route('/generation-progress')
def get_generation_progress():
"""Get the current progress of podcast generation."""
return jsonify(generation_progress)
@app.route('/download/<filename>')
def download_audio(filename):
"""Download the generated audio file."""
try:
temp_dir = tempfile.gettempdir()
file_path = os.path.join(temp_dir, filename)
if not os.path.exists(file_path):
return jsonify({"error": "Audio file not found"}), 404
# Check if this is a podcast or simple speech
download_name = "gemini_podcast.wav"
return send_file(file_path, mimetype="audio/wav", as_attachment=True,
download_name=download_name)
except Exception as e:
logger.error(f"Error downloading audio file: {str(e)}")
return jsonify({"error": str(e)}), 500