import os import asyncio import wave import tempfile import logging import json import time from flask import Flask, render_template, request, jsonify, send_file, stream_with_context, Response from google import genai import aiohttp from pydub import AudioSegment # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) app = Flask(__name__) app.secret_key = os.environ.get("SESSION_SECRET", "default-secret-key") # Configure Gemini API api_key = os.environ.get("GEMINI_API_KEY") if not api_key: logger.warning("GEMINI_API_KEY not found in environment variables. Using default value for development.") api_key = "YOUR_API_KEY" # This will be replaced with env var in production # Define available voices AVAILABLE_VOICES = [ "Puck", "Charon", "Kore", "Fenrir", "Aoede", "Leda", "Orus", "Zephyr" ] language_code="fr-FR" # Global variable to track generation progress generation_progress = { "status": "idle", "current": 0, "total": 0, "message": "" } def update_progress(current, total, message): """Update the global progress tracker.""" global generation_progress generation_progress = { "status": "in_progress" if current < total else "complete", "current": current, "total": total, "message": message } def create_async_enumerate(async_iterator): """Create an async enumerate function since it's not built-in.""" i = 0 async def async_iter(): nonlocal i async for item in async_iterator: yield i, item i += 1 return async_iter() async def generate_speech(text, selected_voice): """Generate speech from text using Gemini AI.""" try: client = genai.Client(api_key=api_key) model = "gemini-2.0-flash-live-001" # Configure the voice settings speech_config = genai.types.SpeechConfig( language_code=language_code, voice_config=genai.types.VoiceConfig( prebuilt_voice_config=genai.types.PrebuiltVoiceConfig( voice_name=selected_voice ) ) ) config = genai.types.LiveConnectConfig( response_modalities=["AUDIO"], speech_config=speech_config ) # Create a temporary file to store the audio with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: temp_filename = tmp_file.name async with client.aio.live.connect(model=model, config=config) as session: # Open the WAV file for writing wf = wave.open(temp_filename, "wb") wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(24000) # Send the text to Gemini await session.send_client_content( turns={"role": "user", "parts": [{"text": text}]}, turn_complete=True ) # Receive the audio data and write it to the file async for idx, response in create_async_enumerate(session.receive()): if response.data is not None: wf.writeframes(response.data) wf.close() return temp_filename except Exception as e: logger.error(f"Error generating speech: {str(e)}") raise e @app.route('/') def index(): """Render the main page.""" return render_template('index.html', voices=AVAILABLE_VOICES) @app.route('/generate', methods=['POST']) async def generate(): """Generate speech from text.""" try: data = request.json text = data.get('text', '') voice = data.get('voice', 'Kore') # Default voice if not text: return jsonify({"error": "Text is required"}), 400 if voice not in AVAILABLE_VOICES: return jsonify({"error": "Invalid voice selection"}), 400 # Generate the speech audio_file = await generate_speech(text, voice) return jsonify({ "status": "success", "message": "Audio generated successfully", "audioUrl": f"/audio/{os.path.basename(audio_file)}" }) except Exception as e: logger.error(f"Error in generate endpoint: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/audio/') def get_audio(filename): """Serve the generated audio file.""" try: temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, filename) if not os.path.exists(file_path): return jsonify({"error": "Audio file not found"}), 404 return send_file(file_path, mimetype="audio/wav", as_attachment=False) except Exception as e: logger.error(f"Error serving audio file: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/generate-podcast', methods=['POST']) async def generate_podcast_route(): """Generate a podcast from a scenario.""" try: scenario = request.json # Reset progress tracker global generation_progress generation_progress = { "status": "in_progress", "current": 0, "total": len(scenario.get('characters', [])), "message": "Démarrage de la génération..." } # Start the background task for podcast generation # We'll return immediately and let the client poll for progress asyncio.create_task(generate_podcast_background(scenario)) return jsonify({ "status": "started", "message": "Génération du podcast commencée. Suivez la progression sur l'interface." }) except Exception as e: logger.error(f"Error in generate-podcast endpoint: {str(e)}") update_progress(0, 0, f"Erreur: {str(e)}") return jsonify({"error": str(e)}), 500 async def generate_podcast_background(scenario): """Generate a podcast in the background.""" try: # Generate audio for each character characters = scenario.get('characters', []) total_characters = len(characters) update_progress(0, total_characters, f"Préparation du podcast avec {total_characters} personnages...") audio_segments = [] podcast_filename = None for idx, character in enumerate(characters): character_name = character.get('name', 'Unknown') voice = character.get('voice', 'Kore') text = character.get('text', '') update_progress(idx, total_characters, f"Génération de l'audio pour {character_name} ({idx+1}/{total_characters})...") if voice not in AVAILABLE_VOICES: logger.warning(f"Voice {voice} not available. Using default voice Kore for {character_name}.") voice = 'Kore' # Generate speech for this character try: audio_file = await generate_speech(text, voice) audio_segments.append(audio_file) except Exception as e: logger.error(f"Error generating speech for {character_name}: {str(e)}") update_progress(0, 0, f"Erreur lors de la génération pour {character_name}: {str(e)}") return update_progress(total_characters, total_characters, "Assemblage des segments audio...") # Combine all audio segments into one file combined = AudioSegment.empty() for audio_file in audio_segments: segment = AudioSegment.from_wav(audio_file) combined += segment # Add a short silence between segments (500ms) combined += AudioSegment.silent(duration=500) # Export the combined audio with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as output_file: podcast_filename = output_file.name combined.export(podcast_filename, format="wav") update_progress(total_characters + 1, total_characters + 1, f"Podcast généré avec succès! audio:{os.path.basename(podcast_filename)}") except Exception as e: logger.error(f"Error in podcast background task: {str(e)}") update_progress(0, 0, f"Erreur: {str(e)}") @app.route('/podcast-status') def podcast_status(): """Get the current status of the podcast generation.""" global generation_progress # If status is complete and contains an audioUrl in the message, extract it if generation_progress["status"] == "complete" and "audio:" in generation_progress["message"]: message_parts = generation_progress["message"].split("audio:") if len(message_parts) > 1: audio_filename = message_parts[1].strip() return jsonify({ "status": "complete", "message": message_parts[0].strip(), "audioUrl": f"/audio/{audio_filename}" }) # Otherwise just return the current progress return jsonify(generation_progress) @app.route('/generation-progress') def get_generation_progress(): """Get the current progress of podcast generation.""" return jsonify(generation_progress) @app.route('/download/') def download_audio(filename): """Download the generated audio file.""" try: temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, filename) if not os.path.exists(file_path): return jsonify({"error": "Audio file not found"}), 404 # Check if this is a podcast or simple speech download_name = "gemini_podcast.wav" return send_file(file_path, mimetype="audio/wav", as_attachment=True, download_name=download_name) except Exception as e: logger.error(f"Error downloading audio file: {str(e)}") return jsonify({"error": str(e)}), 500