import streamlit as st import time import re import os import tempfile import pypdf from pydub import AudioSegment, effects import difflib #CORRECTED IMPORT from utils import ( generate_script, generate_audio_mp3, mix_with_bg_music, DialogueItem, run_research_agent, generate_report ) from prompts import SYSTEM_PROMPT from qa import transcribe_audio_deepgram, handle_qa_exchange MAX_QA_QUESTIONS = 5 # up to 5 voice/text questions def parse_user_edited_transcript(edited_text: str, host_name: str, guest_name: str): pattern = r"\*\*(.+?)\*\*:\s*(.+)" matches = re.findall(pattern, edited_text) items = [] if not matches: raw_name = host_name or "Jane" text_line = edited_text.strip() speaker = "Jane" if raw_name.lower() == guest_name.lower(): speaker = "John" item = DialogueItem( speaker=speaker, display_speaker=raw_name, text=text_line ) items.append(item) return items for (raw_name, text_line) in matches: if raw_name.lower() == host_name.lower(): speaker = "Jane" elif raw_name.lower() == guest_name.lower(): speaker = "John" else: speaker = "Jane" item = DialogueItem( speaker=speaker, display_speaker=raw_name, text=text_line ) items.append(item) return items def regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path=None): audio_segments = [] transcript = "" crossfade_duration = 50 # ms for item in dialogue_items: audio_file = generate_audio_mp3(item.text, item.speaker) seg = AudioSegment.from_file(audio_file, format="mp3") audio_segments.append(seg) transcript += f"**{item.display_speaker}**: {item.text}\n\n" os.remove(audio_file) if not audio_segments: return None, "No audio segments were generated." combined_spoken = audio_segments[0] for seg in audio_segments[1:]: combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration) final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: final_mix.export(temp_audio.name, format="mp3") final_mp3_path = temp_audio.name with open(final_mp3_path, "rb") as f: audio_bytes = f.read() os.remove(final_mp3_path) return audio_bytes, transcript def generate_podcast( research_topic_input, tone, length_minutes, host_name, host_desc, guest_name, guest_desc, user_specs, sponsor_content, sponsor_style, custom_bg_music_path ): if not research_topic_input: return None, "Please enter a topic to research for the podcast." text = st.session_state.get("report_content", "") # Get report content if not text: return None, "Please generate a research report first, or enter a topic." extra_instructions = [] if host_name or guest_name: host_line = f"Host: {host_name or 'Jane'} - {host_desc or 'a curious host'}." guest_line = f"Guest: {guest_name or 'John'} - {guest_desc or 'an expert'}." extra_instructions.append(f"{host_line}\n{guest_line}") if user_specs.strip(): extra_instructions.append(f"Additional User Instructions: {user_specs}") if sponsor_content.strip(): extra_instructions.append( f"Sponsor Content Provided (should be under ~30 seconds):\n{sponsor_content}" ) combined_instructions = "\n\n".join(extra_instructions).strip() full_prompt = SYSTEM_PROMPT if combined_instructions: full_prompt += f"\n\n# Additional Instructions\n{combined_instructions}\n" # Add language-specific instructions if st.session_state.get("language_selection") == "Hinglish": full_prompt += "\n\nPlease generate the script in Romanized Hindi.\n" # Add similar instruction here for Hindi try: script = generate_script( full_prompt, text, tone, f"{length_minutes} Mins", host_name=host_name or "Jane", guest_name=guest_name or "John", sponsor_style=sponsor_style, sponsor_provided=bool(sponsor_content.strip()) ) # If language is Hinglish, transliterate script dialogues to IAST if st.session_state.get("language_selection") == "Hinglish": from indic_transliteration.sanscript import transliterate, DEVANAGARI, IAST for dialogue_item in script.dialogue: dialogue_item.text = transliterate(dialogue_item.text, DEVANAGARI, IAST) except Exception as e: return None, f"Error generating script: {str(e)}" audio_segments = [] transcript = "" crossfade_duration = 50 try: for item in script.dialogue: language = st.session_state.get("language_selection", "English (American)") if language in ["English (Indian)", "Hinglish", "Hindi"]: tts_speaker = "John" if item.display_speaker.lower() == (guest_name or "John").lower() else "Jane" else: tts_speaker = item.speaker audio_file = generate_audio_mp3(item.text, tts_speaker) seg = AudioSegment.from_file(audio_file, format="mp3") audio_segments.append(seg) transcript += f"**{item.display_speaker}**: {item.text}\n\n" os.remove(audio_file) if not audio_segments: return None, "No audio segments generated." combined_spoken = audio_segments[0] for seg in audio_segments[1:]: combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration) final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: final_mix.export(temp_audio.name, format="mp3") final_mp3_path = temp_audio.name with open(final_mp3_path, "rb") as f: audio_bytes = f.read() os.remove(final_mp3_path) return audio_bytes, transcript except Exception as e: return None, f"Error generating audio: {str(e)}" def highlight_differences(original: str, edited: str) -> str: matcher = difflib.SequenceMatcher(None, original.split(), edited.split()) highlighted = [] for opcode, i1, i2, j1, j2 in matcher.get_opcodes(): if opcode == 'equal': highlighted.extend(original.split()[i1:i2]) elif opcode in ('replace', 'insert'): added_words = edited.split()[j1:j2] highlighted.extend([f'{word}' for word in added_words]) elif opcode == 'delete': pass return ' '.join(highlighted) def main(): st.set_page_config( page_title="MyPod v3: AI-Powered Podcast & Research", layout="centered" ) st.markdown(""" """, unsafe_allow_html=True) logo_col, title_col = st.columns([1, 10]) with logo_col: st.image("logomypod.jpg", width=70) with title_col: st.markdown("## MyPod v3: AI-Powered Podcast & Research") st.markdown(""" Welcome to **MyPod**, your go-to AI-powered podcast generator and research report tool! 🎉 MyPod now offers two main functionalities: 1. **Generate Research Reports:** Provide a research topic, and MyPod will use its AI-powered research agent to create a comprehensive, well-structured research report in PDF format. 2. **Generate Podcasts:** Transform your research topic (or the generated report) into an engaging, human-sounding podcast. Select your desired mode below and let the magic happen! """) with st.expander("How to Use"): st.markdown(""" **For Research Reports:**
  1. Select "Generate Research Report".
  2. Enter your research topic.
  3. Click 'Generate Report'.
  4. MyPod will use its AI agent to research the topic and create a PDF report.
  5. Once generated, you can view and download the report.
**For Podcasts:**
  1. Select "Generate Podcast".
  2. Enter the research topic (this will be used as the basis for the podcast). OR FIRST GENERATE A REPORT AND THEN SELECT PODCAST.
  3. Choose the tone, language, and target duration.
  4. Add custom names and descriptions for the speakers (optional).
  5. Add sponsored content (optional).
  6. Click 'Generate Podcast'.
""", unsafe_allow_html=True) # --- Main Mode Selection --- mode = st.radio("Choose a Mode:", ["Generate Research Report", "Generate Podcast"]) # --- Research Report Section --- if mode == "Generate Research Report": st.markdown("### Generate Research Report") research_topic_input = st.text_input("Enter your research topic:") report_button = st.button("Generate Report") if report_button: if not research_topic_input: st.error("Please enter a research topic.") else: with st.spinner("Researching and generating report... This may take several minutes."): try: report_content = run_research_agent(research_topic_input) st.session_state["report_content"] = report_content # Display report (basic text for now) st.markdown("### Generated Report Preview") st.text_area("Report Content", value=report_content, height=300) # Generate PDF and offer download with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile: pdf_path = tmpfile.name generate_report(report_content, filename=pdf_path) # Generate PDF with open(pdf_path, "rb") as f: pdf_bytes = f.read() os.remove(pdf_path) # Clean up temp file st.download_button( label="Download Report (PDF)", data=pdf_bytes, file_name=f"{research_topic_input}_report.pdf", mime="application/pdf" ) st.success("Report generated successfully!") except Exception as e: st.error(f"An error occurred: {e}") # --- Podcast Generation Section --- elif mode == "Generate Podcast": st.markdown("### Generate Podcast") research_topic_input = st.text_input("Enter research topic for the podcast (or use a generated report):") tone = st.radio("Tone", ["Casual", "Formal", "Humorous", "Youthful"], index=0) length_minutes = st.slider("Podcast Length (in minutes)", 1, 60, 3) language = st.selectbox( "Choose Language and Accent", ["English (American)", "English (Indian)", "Hinglish", "Hindi"], index=0 ) st.session_state["language_selection"] = language st.markdown("### Customize Your Podcast (Optional)") with st.expander("Set Host & Guest Names/Descriptions (Optional)"): host_name = st.text_input("Female Host Name (leave blank for 'Jane')") host_desc = st.text_input("Female Host Description (Optional)") guest_name = st.text_input("Male Guest Name (leave blank for 'John')") guest_desc = st.text_input("Male Guest Description (Optional)") user_specs = st.text_area("Any special instructions or prompts for the script? (Optional)", "") sponsor_content = st.text_area("Sponsored Content / Ad (Optional)", "") sponsor_style = st.selectbox("Sponsor Integration Style", ["Separate Break", "Blended"]) custom_bg_music_file = st.file_uploader("Upload Custom Background Music (Optional)", type=["mp3", "wav"]) custom_bg_music_path = None if custom_bg_music_file: with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(custom_bg_music_file.name)[1]) as tmp: tmp.write(custom_bg_music_file.read()) custom_bg_music_path = tmp.name if "audio_bytes" not in st.session_state: st.session_state["audio_bytes"] = None if "transcript" not in st.session_state: st.session_state["transcript"] = None if "transcript_original" not in st.session_state: st.session_state["transcript_original"] = None if "qa_count" not in st.session_state: st.session_state["qa_count"] = 0 if "conversation_history" not in st.session_state: st.session_state["conversation_history"] = "" generate_button = st.button("Generate Podcast") if generate_button: progress_bar = st.progress(0) progress_text = st.empty() progress_messages = [ "🔍 Analyzing your input...", "📝 Crafting the perfect script...", "🎙️ Generating high-quality audio...", "🎶 Adding the finishing touches..." ] progress_text.write(progress_messages[0]) progress_bar.progress(0) time.sleep(1.0) progress_text.write(progress_messages[1]) progress_bar.progress(25) time.sleep(1.0) progress_text.write(progress_messages[2]) progress_bar.progress(50) time.sleep(1.0) progress_text.write(progress_messages[3]) progress_bar.progress(75) time.sleep(1.0) audio_bytes, transcript = generate_podcast( research_topic_input, tone, length_minutes, host_name, host_desc, guest_name, guest_desc, user_specs, sponsor_content, sponsor_style, custom_bg_music_path ) progress_bar.progress(100) progress_text.write("✅ Done!") if audio_bytes is None: st.error(transcript) st.session_state["audio_bytes"] = None st.session_state["transcript"] = None st.session_state["transcript_original"] = None else: st.success("Podcast generated successfully!") st.session_state["audio_bytes"] = audio_bytes st.session_state["transcript"] = transcript st.session_state["transcript_original"] = transcript st.session_state["qa_count"] = 0 st.session_state["conversation_history"] = "" if st.session_state.get("audio_bytes"): st.audio(st.session_state["audio_bytes"], format='audio/mp3') st.download_button( label="Download Podcast (MP3)", data=st.session_state["audio_bytes"], file_name="my_podcast.mp3", mime="audio/mpeg" ) st.markdown("### Generated Transcript (Editable)") edited_text = st.text_area( "Feel free to tweak lines, fix errors, or reword anything.", value=st.session_state["transcript"], height=300 ) if st.session_state.get("transcript_original"): highlighted_transcript = highlight_differences( st.session_state["transcript_original"], edited_text ) st.markdown("### **Edited Transcript Highlights**", unsafe_allow_html=True) st.markdown(highlighted_transcript, unsafe_allow_html=True) if st.button("Regenerate Audio From Edited Text"): regen_bar = st.progress(0) regen_text = st.empty() regen_text.write("🔄 Regenerating your podcast with the edits...") regen_bar.progress(25) time.sleep(1.0) regen_text.write("🔧 Adjusting the script based on your changes...") regen_bar.progress(50) time.sleep(1.0) dialogue_items = parse_user_edited_transcript( edited_text, host_name or "Jane", guest_name or "John" ) new_audio_bytes, new_transcript = regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path) regen_bar.progress(75) time.sleep(1.0) if new_audio_bytes is None: regen_bar.progress(100) st.error(new_transcript) else: regen_bar.progress(100) regen_text.write("✅ Regeneration complete!") st.success("Regenerated audio below:") st.session_state["audio_bytes"] = new_audio_bytes st.session_state["transcript"] = new_transcript st.session_state["transcript_original"] = new_transcript st.audio(new_audio_bytes, format='audio/mp3') st.download_button( label="Download Edited Podcast (MP3)", data=new_audio_bytes, file_name="my_podcast_edited.mp3", mime="audio/mpeg" ) st.markdown("### Updated Transcript") st.markdown(new_transcript) st.markdown("## Post-Podcast Q&A") used_questions = st.session_state.get("qa_count", 0) remaining = MAX_QA_QUESTIONS - used_questions if remaining > 0: st.write(f"You can ask up to {remaining} more question(s).") typed_q = st.text_input("Type your follow-up question:") audio_q = st.audio_input("Or record an audio question (WAV)") if st.button("Submit Q&A"): if used_questions >= MAX_QA_QUESTIONS: st.warning("You have reached the Q&A limit.") else: question_text = typed_q.strip() if audio_q is not None: suffix = ".wav" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(audio_q.read()) local_audio_path = tmp.name st.write("Transcribing your audio question...") audio_transcript = transcribe_audio_deepgram(local_audio_path) if audio_transcript: question_text = audio_transcript if not question_text: st.warning("No question found (text or audio).") else: st.write("Generating an answer...") ans_audio, ans_text = handle_qa_exchange(question_text) if ans_audio: st.audio(ans_audio, format='audio/mp3') st.markdown(f"**John**: {ans_text}") st.session_state["qa_count"] = used_questions + 1 else: st.warning("No response could be generated.") else: st.write("You have used all 5 Q&A opportunities.") st.markdown("", unsafe_allow_html=True) if __name__ == "__main__": main()