SearchPod1.0 / app.py
siddhartharyaai's picture
Upload 4 files
48c504d verified
import streamlit as st
import time
import re
import os
import tempfile
import pypdf
from pydub import AudioSegment, effects
import difflib
#CORRECTED IMPORT
from utils import (
generate_script,
generate_audio_mp3,
mix_with_bg_music,
DialogueItem,
run_research_agent,
generate_report
)
from prompts import SYSTEM_PROMPT
from qa import transcribe_audio_deepgram, handle_qa_exchange
MAX_QA_QUESTIONS = 5 # up to 5 voice/text questions
def parse_user_edited_transcript(edited_text: str, host_name: str, guest_name: str):
pattern = r"\*\*(.+?)\*\*:\s*(.+)"
matches = re.findall(pattern, edited_text)
items = []
if not matches:
raw_name = host_name or "Jane"
text_line = edited_text.strip()
speaker = "Jane"
if raw_name.lower() == guest_name.lower():
speaker = "John"
item = DialogueItem(
speaker=speaker,
display_speaker=raw_name,
text=text_line
)
items.append(item)
return items
for (raw_name, text_line) in matches:
if raw_name.lower() == host_name.lower():
speaker = "Jane"
elif raw_name.lower() == guest_name.lower():
speaker = "John"
else:
speaker = "Jane"
item = DialogueItem(
speaker=speaker,
display_speaker=raw_name,
text=text_line
)
items.append(item)
return items
def regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path=None):
audio_segments = []
transcript = ""
crossfade_duration = 50 # ms
for item in dialogue_items:
audio_file = generate_audio_mp3(item.text, item.speaker)
seg = AudioSegment.from_file(audio_file, format="mp3")
audio_segments.append(seg)
transcript += f"**{item.display_speaker}**: {item.text}\n\n"
os.remove(audio_file)
if not audio_segments:
return None, "No audio segments were generated."
combined_spoken = audio_segments[0]
for seg in audio_segments[1:]:
combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration)
final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
final_mix.export(temp_audio.name, format="mp3")
final_mp3_path = temp_audio.name
with open(final_mp3_path, "rb") as f:
audio_bytes = f.read()
os.remove(final_mp3_path)
return audio_bytes, transcript
def generate_podcast(
research_topic_input,
tone,
length_minutes,
host_name,
host_desc,
guest_name,
guest_desc,
user_specs,
sponsor_content,
sponsor_style,
custom_bg_music_path
):
if not research_topic_input:
return None, "Please enter a topic to research for the podcast."
text = st.session_state.get("report_content", "") # Get report content
if not text:
return None, "Please generate a research report first, or enter a topic."
extra_instructions = []
if host_name or guest_name:
host_line = f"Host: {host_name or 'Jane'} - {host_desc or 'a curious host'}."
guest_line = f"Guest: {guest_name or 'John'} - {guest_desc or 'an expert'}."
extra_instructions.append(f"{host_line}\n{guest_line}")
if user_specs.strip():
extra_instructions.append(f"Additional User Instructions: {user_specs}")
if sponsor_content.strip():
extra_instructions.append(
f"Sponsor Content Provided (should be under ~30 seconds):\n{sponsor_content}"
)
combined_instructions = "\n\n".join(extra_instructions).strip()
full_prompt = SYSTEM_PROMPT
if combined_instructions:
full_prompt += f"\n\n# Additional Instructions\n{combined_instructions}\n"
# Add language-specific instructions
if st.session_state.get("language_selection") == "Hinglish":
full_prompt += "\n\nPlease generate the script in Romanized Hindi.\n"
# Add similar instruction here for Hindi
try:
script = generate_script(
full_prompt,
text,
tone,
f"{length_minutes} Mins",
host_name=host_name or "Jane",
guest_name=guest_name or "John",
sponsor_style=sponsor_style,
sponsor_provided=bool(sponsor_content.strip())
)
# If language is Hinglish, transliterate script dialogues to IAST
if st.session_state.get("language_selection") == "Hinglish":
from indic_transliteration.sanscript import transliterate, DEVANAGARI, IAST
for dialogue_item in script.dialogue:
dialogue_item.text = transliterate(dialogue_item.text, DEVANAGARI, IAST)
except Exception as e:
return None, f"Error generating script: {str(e)}"
audio_segments = []
transcript = ""
crossfade_duration = 50
try:
for item in script.dialogue:
language = st.session_state.get("language_selection", "English (American)")
if language in ["English (Indian)", "Hinglish", "Hindi"]:
tts_speaker = "John" if item.display_speaker.lower() == (guest_name or "John").lower() else "Jane"
else:
tts_speaker = item.speaker
audio_file = generate_audio_mp3(item.text, tts_speaker)
seg = AudioSegment.from_file(audio_file, format="mp3")
audio_segments.append(seg)
transcript += f"**{item.display_speaker}**: {item.text}\n\n"
os.remove(audio_file)
if not audio_segments:
return None, "No audio segments generated."
combined_spoken = audio_segments[0]
for seg in audio_segments[1:]:
combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration)
final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
final_mix.export(temp_audio.name, format="mp3")
final_mp3_path = temp_audio.name
with open(final_mp3_path, "rb") as f:
audio_bytes = f.read()
os.remove(final_mp3_path)
return audio_bytes, transcript
except Exception as e:
return None, f"Error generating audio: {str(e)}"
def highlight_differences(original: str, edited: str) -> str:
matcher = difflib.SequenceMatcher(None, original.split(), edited.split())
highlighted = []
for opcode, i1, i2, j1, j2 in matcher.get_opcodes():
if opcode == 'equal':
highlighted.extend(original.split()[i1:i2])
elif opcode in ('replace', 'insert'):
added_words = edited.split()[j1:j2]
highlighted.extend([f'<span style="color:red">{word}</span>' for word in added_words])
elif opcode == 'delete':
pass
return ' '.join(highlighted)
def main():
st.set_page_config(
page_title="MyPod v3: AI-Powered Podcast & Research",
layout="centered"
)
st.markdown("""
<style>
.stFileUploader>div>div>div {
transform: scale(0.9);
}
footer {
text-align: center;
padding: 1em 0;
font-size: 0.8em;
color: #888;
}
</style>
""", unsafe_allow_html=True)
logo_col, title_col = st.columns([1, 10])
with logo_col:
st.image("logomypod.jpg", width=70)
with title_col:
st.markdown("## MyPod v3: AI-Powered Podcast & Research")
st.markdown("""
Welcome to **MyPod**, your go-to AI-powered podcast generator and research report tool! 🎉
MyPod now offers two main functionalities:
1. **Generate Research Reports:** Provide a research topic, and MyPod will use its AI-powered research agent to create a comprehensive, well-structured research report in PDF format.
2. **Generate Podcasts:** Transform your research topic (or the generated report) into an engaging, human-sounding podcast.
Select your desired mode below and let the magic happen!
""")
with st.expander("How to Use"):
st.markdown("""
**For Research Reports:**
<ol style="font-size:18px;">
<li>Select "Generate Research Report".</li>
<li>Enter your research topic.</li>
<li>Click 'Generate Report'.</li>
<li>MyPod will use its AI agent to research the topic and create a PDF report.</li>
<li>Once generated, you can view and download the report.</li>
</ol>
**For Podcasts:**
<ol style="font-size:18px;">
<li>Select "Generate Podcast".</li>
<li>Enter the research topic (this will be used as the basis for the podcast). OR FIRST GENERATE A REPORT AND THEN SELECT PODCAST.</li>
<li>Choose the tone, language, and target duration.</li>
<li>Add custom names and descriptions for the speakers (optional).</li>
<li>Add sponsored content (optional).</li>
<li>Click 'Generate Podcast'.</li>
</ol>
""", unsafe_allow_html=True)
# --- Main Mode Selection ---
mode = st.radio("Choose a Mode:", ["Generate Research Report", "Generate Podcast"])
# --- Research Report Section ---
if mode == "Generate Research Report":
st.markdown("### Generate Research Report")
research_topic_input = st.text_input("Enter your research topic:")
report_button = st.button("Generate Report")
if report_button:
if not research_topic_input:
st.error("Please enter a research topic.")
else:
with st.spinner("Researching and generating report... This may take several minutes."):
try:
report_content = run_research_agent(research_topic_input)
st.session_state["report_content"] = report_content
# Display report (basic text for now)
st.markdown("### Generated Report Preview")
st.text_area("Report Content", value=report_content, height=300)
# Generate PDF and offer download
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile:
pdf_path = tmpfile.name
generate_report(report_content, filename=pdf_path) # Generate PDF
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
os.remove(pdf_path) # Clean up temp file
st.download_button(
label="Download Report (PDF)",
data=pdf_bytes,
file_name=f"{research_topic_input}_report.pdf",
mime="application/pdf"
)
st.success("Report generated successfully!")
except Exception as e:
st.error(f"An error occurred: {e}")
# --- Podcast Generation Section ---
elif mode == "Generate Podcast":
st.markdown("### Generate Podcast")
research_topic_input = st.text_input("Enter research topic for the podcast (or use a generated report):")
tone = st.radio("Tone", ["Casual", "Formal", "Humorous", "Youthful"], index=0)
length_minutes = st.slider("Podcast Length (in minutes)", 1, 60, 3)
language = st.selectbox(
"Choose Language and Accent",
["English (American)", "English (Indian)", "Hinglish", "Hindi"],
index=0
)
st.session_state["language_selection"] = language
st.markdown("### Customize Your Podcast (Optional)")
with st.expander("Set Host & Guest Names/Descriptions (Optional)"):
host_name = st.text_input("Female Host Name (leave blank for 'Jane')")
host_desc = st.text_input("Female Host Description (Optional)")
guest_name = st.text_input("Male Guest Name (leave blank for 'John')")
guest_desc = st.text_input("Male Guest Description (Optional)")
user_specs = st.text_area("Any special instructions or prompts for the script? (Optional)", "")
sponsor_content = st.text_area("Sponsored Content / Ad (Optional)", "")
sponsor_style = st.selectbox("Sponsor Integration Style", ["Separate Break", "Blended"])
custom_bg_music_file = st.file_uploader("Upload Custom Background Music (Optional)", type=["mp3", "wav"])
custom_bg_music_path = None
if custom_bg_music_file:
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(custom_bg_music_file.name)[1]) as tmp:
tmp.write(custom_bg_music_file.read())
custom_bg_music_path = tmp.name
if "audio_bytes" not in st.session_state:
st.session_state["audio_bytes"] = None
if "transcript" not in st.session_state:
st.session_state["transcript"] = None
if "transcript_original" not in st.session_state:
st.session_state["transcript_original"] = None
if "qa_count" not in st.session_state:
st.session_state["qa_count"] = 0
if "conversation_history" not in st.session_state:
st.session_state["conversation_history"] = ""
generate_button = st.button("Generate Podcast")
if generate_button:
progress_bar = st.progress(0)
progress_text = st.empty()
progress_messages = [
"🔍 Analyzing your input...",
"📝 Crafting the perfect script...",
"🎙️ Generating high-quality audio...",
"🎶 Adding the finishing touches..."
]
progress_text.write(progress_messages[0])
progress_bar.progress(0)
time.sleep(1.0)
progress_text.write(progress_messages[1])
progress_bar.progress(25)
time.sleep(1.0)
progress_text.write(progress_messages[2])
progress_bar.progress(50)
time.sleep(1.0)
progress_text.write(progress_messages[3])
progress_bar.progress(75)
time.sleep(1.0)
audio_bytes, transcript = generate_podcast(
research_topic_input,
tone,
length_minutes,
host_name,
host_desc,
guest_name,
guest_desc,
user_specs,
sponsor_content,
sponsor_style,
custom_bg_music_path
)
progress_bar.progress(100)
progress_text.write("✅ Done!")
if audio_bytes is None:
st.error(transcript)
st.session_state["audio_bytes"] = None
st.session_state["transcript"] = None
st.session_state["transcript_original"] = None
else:
st.success("Podcast generated successfully!")
st.session_state["audio_bytes"] = audio_bytes
st.session_state["transcript"] = transcript
st.session_state["transcript_original"] = transcript
st.session_state["qa_count"] = 0
st.session_state["conversation_history"] = ""
if st.session_state.get("audio_bytes"):
st.audio(st.session_state["audio_bytes"], format='audio/mp3')
st.download_button(
label="Download Podcast (MP3)",
data=st.session_state["audio_bytes"],
file_name="my_podcast.mp3",
mime="audio/mpeg"
)
st.markdown("### Generated Transcript (Editable)")
edited_text = st.text_area(
"Feel free to tweak lines, fix errors, or reword anything.",
value=st.session_state["transcript"],
height=300
)
if st.session_state.get("transcript_original"):
highlighted_transcript = highlight_differences(
st.session_state["transcript_original"],
edited_text
)
st.markdown("### **Edited Transcript Highlights**", unsafe_allow_html=True)
st.markdown(highlighted_transcript, unsafe_allow_html=True)
if st.button("Regenerate Audio From Edited Text"):
regen_bar = st.progress(0)
regen_text = st.empty()
regen_text.write("🔄 Regenerating your podcast with the edits...")
regen_bar.progress(25)
time.sleep(1.0)
regen_text.write("🔧 Adjusting the script based on your changes...")
regen_bar.progress(50)
time.sleep(1.0)
dialogue_items = parse_user_edited_transcript(
edited_text,
host_name or "Jane",
guest_name or "John"
)
new_audio_bytes, new_transcript = regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path)
regen_bar.progress(75)
time.sleep(1.0)
if new_audio_bytes is None:
regen_bar.progress(100)
st.error(new_transcript)
else:
regen_bar.progress(100)
regen_text.write("✅ Regeneration complete!")
st.success("Regenerated audio below:")
st.session_state["audio_bytes"] = new_audio_bytes
st.session_state["transcript"] = new_transcript
st.session_state["transcript_original"] = new_transcript
st.audio(new_audio_bytes, format='audio/mp3')
st.download_button(
label="Download Edited Podcast (MP3)",
data=new_audio_bytes,
file_name="my_podcast_edited.mp3",
mime="audio/mpeg"
)
st.markdown("### Updated Transcript")
st.markdown(new_transcript)
st.markdown("## Post-Podcast Q&A")
used_questions = st.session_state.get("qa_count", 0)
remaining = MAX_QA_QUESTIONS - used_questions
if remaining > 0:
st.write(f"You can ask up to {remaining} more question(s).")
typed_q = st.text_input("Type your follow-up question:")
audio_q = st.audio_input("Or record an audio question (WAV)")
if st.button("Submit Q&A"):
if used_questions >= MAX_QA_QUESTIONS:
st.warning("You have reached the Q&A limit.")
else:
question_text = typed_q.strip()
if audio_q is not None:
suffix = ".wav"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(audio_q.read())
local_audio_path = tmp.name
st.write("Transcribing your audio question...")
audio_transcript = transcribe_audio_deepgram(local_audio_path)
if audio_transcript:
question_text = audio_transcript
if not question_text:
st.warning("No question found (text or audio).")
else:
st.write("Generating an answer...")
ans_audio, ans_text = handle_qa_exchange(question_text)
if ans_audio:
st.audio(ans_audio, format='audio/mp3')
st.markdown(f"**John**: {ans_text}")
st.session_state["qa_count"] = used_questions + 1
else:
st.warning("No response could be generated.")
else:
st.write("You have used all 5 Q&A opportunities.")
st.markdown("<footer>©2025 MyPod. All rights reserved.</footer>", unsafe_allow_html=True)
if __name__ == "__main__":
main()