Spaces:
Running
Running
# app.py | |
import streamlit as st | |
import time | |
import re | |
import os | |
import tempfile | |
import pypdf | |
from pydub import AudioSegment, effects | |
import difflib # For computing differences between texts | |
from utils import ( | |
generate_script, | |
generate_audio_mp3, | |
truncate_text, | |
extract_text_from_url, | |
transcribe_youtube_video, | |
research_topic, | |
mix_with_bg_music, | |
DialogueItem # so we can construct items | |
) | |
from prompts import SYSTEM_PROMPT | |
def parse_user_edited_transcript(edited_text: str, host_name: str, guest_name: str): | |
""" | |
Looks for lines like: | |
**Angela**: Hello | |
**Dimitris**: Great topic... | |
We treat 'Angela' as the raw display_speaker, 'Hello' as text. | |
Then we map 'Angela' -> speaker='Jane' if it matches host_name (case-insensitive), | |
'Dimitris' -> speaker='John' if it matches guest_name, else default to 'Jane'. | |
Returns a list of (DialogueItem). | |
""" | |
pattern = r"\*\*(.+?)\*\*:\s*(.+)" | |
matches = re.findall(pattern, edited_text) | |
items = [] | |
if not matches: | |
# No lines found, treat entire text as if it's host | |
raw_name = host_name or "Jane" | |
text_line = edited_text.strip() | |
speaker = "Jane" | |
if raw_name.lower() == guest_name.lower(): | |
speaker = "John" | |
# build a single item | |
item = DialogueItem( | |
speaker=speaker, | |
display_speaker=raw_name, | |
text=text_line | |
) | |
items.append(item) | |
return items | |
# If we have multiple lines | |
for (raw_name, text_line) in matches: | |
# Map to TTS speaker | |
if raw_name.lower() == host_name.lower(): | |
# host -> female | |
speaker = "Jane" | |
elif raw_name.lower() == guest_name.lower(): | |
# guest -> male | |
speaker = "John" | |
else: | |
# unknown -> default to female host | |
speaker = "Jane" | |
item = DialogueItem( | |
speaker=speaker, | |
display_speaker=raw_name, | |
text=text_line | |
) | |
items.append(item) | |
return items | |
def regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path=None): | |
""" | |
Re-generates multi-speaker audio from user-edited DialogueItems, | |
then mixes with background music (bg_music.mp3) or custom music. | |
Returns final audio bytes and updated transcript (using display_speaker). | |
""" | |
audio_segments = [] | |
transcript = "" | |
crossfade_duration = 50 # in ms | |
for item in dialogue_items: | |
audio_file = generate_audio_mp3(item.text, item.speaker) | |
seg = AudioSegment.from_file(audio_file, format="mp3") | |
audio_segments.append(seg) | |
# Use item.display_speaker for the text transcript | |
transcript += f"**{item.display_speaker}**: {item.text}\n\n" | |
os.remove(audio_file) | |
if not audio_segments: | |
return None, "No audio segments were generated." | |
# Combine spoken segments sequentially | |
combined_spoken = audio_segments[0] | |
for seg in audio_segments[1:]: | |
combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration) | |
final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: | |
final_mix.export(temp_audio.name, format="mp3") | |
final_mp3_path = temp_audio.name | |
with open(final_mp3_path, "rb") as f: | |
audio_bytes = f.read() | |
os.remove(final_mp3_path) | |
return audio_bytes, transcript | |
def generate_podcast( | |
file, | |
url, | |
video_url, | |
research_topic_input, | |
tone, | |
length_minutes, | |
host_name, | |
host_desc, | |
guest_name, | |
guest_desc, | |
user_specs, | |
sponsor_content, | |
custom_bg_music_path | |
): | |
""" | |
Creates a multi-speaker podcast from PDF, URL, YouTube, or a research topic. | |
Uses female voice (Jane) for host, male voice (John) for guest. | |
Display_speaker is user-chosen name, speaker is "Jane" or "John". | |
Returns (audio_bytes, transcript_str). | |
""" | |
sources = [bool(file), bool(url), bool(video_url), bool(research_topic_input)] | |
if sum(sources) > 1: | |
return None, "Provide only one input (PDF, URL, YouTube, or Research topic)." | |
if not any(sources): | |
return None, "Please provide at least one source." | |
text = "" | |
if file: | |
try: | |
if not file.name.lower().endswith('.pdf'): | |
return None, "Please upload a PDF file." | |
reader = pypdf.PdfReader(file) | |
text = " ".join(page.extract_text() for page in reader.pages if page.extract_text()) | |
except Exception as e: | |
return None, f"Error reading PDF: {str(e)}" | |
elif url: | |
try: | |
text = extract_text_from_url(url) | |
if not text: | |
return None, "Failed to extract text from URL." | |
except Exception as e: | |
return None, f"Error extracting text from URL: {str(e)}" | |
elif video_url: | |
try: | |
text = transcribe_youtube_video(video_url) | |
if not text: | |
return None, "Failed to transcribe YouTube video." | |
except Exception as e: | |
return None, f"Error transcribing YouTube video: {str(e)}" | |
elif research_topic_input: | |
try: | |
text = research_topic(research_topic_input) | |
if not text: | |
return None, f"Sorry, no information found on '{research_topic_input}'." | |
except Exception as e: | |
return None, f"Error researching topic: {str(e)}" | |
# Truncate if needed | |
text = truncate_text(text) | |
# Build extra instructions | |
extra_instructions = [] | |
if host_name or guest_name: | |
h = f"Host: {host_name or 'Jane'} - {host_desc or 'a curious host'}." | |
g = f"Guest: {guest_name or 'John'} - {guest_desc or 'an expert'}." | |
extra_instructions.append(f"{h}\n{g}") | |
if user_specs.strip(): | |
extra_instructions.append(f"Additional User Instructions: {user_specs}") | |
if sponsor_content.strip(): | |
extra_instructions.append( | |
"Please include a short sponsored advertisement. The sponsor text is as follows:\n" | |
+ sponsor_content | |
) | |
combined_instructions = "\n\n".join(extra_instructions).strip() | |
full_prompt = SYSTEM_PROMPT | |
if combined_instructions: | |
full_prompt += f"\n\n# Additional Instructions\n{combined_instructions}\n" | |
# Use "generate_script" with host/guest name so it can do the mapping | |
try: | |
script = generate_script( | |
full_prompt, | |
text, | |
tone, | |
f"{length_minutes} Mins", | |
host_name=host_name or "Jane", | |
guest_name=guest_name or "John" | |
) | |
except Exception as e: | |
return None, f"Error generating script: {str(e)}" | |
audio_segments = [] | |
transcript = "" | |
crossfade_duration = 50 # ms | |
try: | |
for item in script.dialogue: | |
# item.speaker is guaranteed "Jane" or "John" | |
# item.display_speaker is the user-facing name | |
audio_file = generate_audio_mp3(item.text, item.speaker) | |
seg = AudioSegment.from_file(audio_file, format="mp3") | |
audio_segments.append(seg) | |
transcript += f"**{item.display_speaker}**: {item.text}\n\n" | |
os.remove(audio_file) | |
if not audio_segments: | |
return None, "No audio segments generated." | |
combined_spoken = audio_segments[0] | |
for seg in audio_segments[1:]: | |
combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration) | |
final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: | |
final_mix.export(temp_audio.name, format="mp3") | |
final_mp3_path = temp_audio.name | |
with open(final_mp3_path, "rb") as f: | |
audio_bytes = f.read() | |
os.remove(final_mp3_path) | |
return audio_bytes, transcript | |
except Exception as e: | |
return None, f"Error generating audio: {str(e)}" | |
def highlight_differences(original: str, edited: str) -> str: | |
""" | |
Highlights the differences between the original and edited transcripts. | |
Added or modified words are wrapped in <span> tags with red color. | |
""" | |
matcher = difflib.SequenceMatcher(None, original.split(), edited.split()) | |
highlighted = [] | |
for opcode, i1, i2, j1, j2 in matcher.get_opcodes(): | |
if opcode == 'equal': | |
highlighted.extend(original.split()[i1:i2]) | |
elif opcode in ('replace', 'insert'): | |
added_words = edited.split()[j1:j2] | |
highlighted.extend([f'<span style="color:red">{word}</span>' for word in added_words]) | |
elif opcode == 'delete': | |
pass | |
return ' '.join(highlighted) | |
def main(): | |
st.set_page_config(page_title="MyPod - AI-based Podcast Generator", layout="centered") | |
st.markdown("## MyPod - AI powered Podcast Generator") | |
st.markdown( | |
"Welcome to **MyPod**, your go-to AI-powered podcast generator! π\n\n" | |
"MyPod transforms your documents, webpages, YouTube videos, or research topics into a more human-sounding, conversational podcast.\n" | |
"Select a tone and a duration range. The script will be on-topic, concise, and respect your chosen length.\n\n" | |
"### How to use:\n" | |
"1. **Provide one source:** PDF Files, Website URL, YouTube link or a Topic to Research.\n" | |
"2. **Choose the tone and the target duration.**\n" | |
"3. **Click 'Generate Podcast'** to produce your podcast. After the audio is generated, you can edit the transcript and re-generate the audio with your edits if needed.\n\n" | |
"**Token Limit:** Up to ~2,048 tokens are supported. Long inputs may be truncated.\n" | |
"**Note:** YouTube videos will only work if they have captions built in.\n\n" | |
"β³**Please be patient while your podcast is being generated.** This process involves content analysis, script creation, " | |
"and high-quality audio synthesis, which may take a few minutes.\n\n" | |
"π₯ **Ready to create your personalized podcast?** Give it a try now and let the magic happen! π₯" | |
) | |
col1, col2 = st.columns(2) | |
with col1: | |
file = st.file_uploader("Upload File (.pdf only)", type=["pdf"]) | |
url = st.text_input("Or Enter Website URL") | |
video_url = st.text_input("Or Enter YouTube Link (Captioned videos)") | |
with col2: | |
research_topic_input = st.text_input("Or Research a Topic") | |
tone = st.radio("Tone", ["Humorous", "Formal", "Casual", "Youthful"], index=2) | |
length_minutes = st.slider("Podcast Length (in minutes)", 1, 60, 3) | |
st.markdown("### Customize Your Podcast (New Features)") | |
with st.expander("Set Host & Guest Names/Descriptions (Optional)"): | |
host_name = st.text_input("Host Name (leave blank for 'Jane')") | |
host_desc = st.text_input("Host Description (Optional)") | |
guest_name = st.text_input("Guest Name (leave blank for 'John')") | |
guest_desc = st.text_input("Guest Description (Optional)") | |
user_specs = st.text_area("Any special instructions or prompts for the script? (Optional)", "") | |
sponsor_content = st.text_area("Sponsored Content / Ad (Optional)", "") | |
custom_bg_music_file = st.file_uploader("Upload Custom Background Music (Optional)", type=["mp3", "wav"]) | |
custom_bg_music_path = None | |
if custom_bg_music_file: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(custom_bg_music_file.name)[1]) as tmp: | |
tmp.write(custom_bg_music_file.read()) | |
custom_bg_music_path = tmp.name | |
if "audio_bytes" not in st.session_state: | |
st.session_state["audio_bytes"] = None | |
if "transcript" not in st.session_state: | |
st.session_state["transcript"] = None | |
if "transcript_original" not in st.session_state: | |
st.session_state["transcript_original"] = None | |
generate_button = st.button("Generate Podcast") | |
if generate_button: | |
progress_bar = st.progress(0) | |
progress_text = st.empty() | |
messages = [ | |
"π Analyzing your input...", | |
"π Crafting the perfect script...", | |
"ποΈ Generating high-quality audio...", | |
"πΆ Adding the finishing touches..." | |
] | |
progress_text.write(messages[0]) | |
progress_bar.progress(0) | |
time.sleep(1.0) | |
progress_text.write(messages[1]) | |
progress_bar.progress(25) | |
time.sleep(1.0) | |
progress_text.write(messages[2]) | |
progress_bar.progress(50) | |
time.sleep(1.0) | |
progress_text.write(messages[3]) | |
progress_bar.progress(75) | |
time.sleep(1.0) | |
audio_bytes, transcript = generate_podcast( | |
file, | |
url, | |
video_url, | |
research_topic_input, | |
tone, | |
length_minutes, | |
host_name, | |
host_desc, | |
guest_name, | |
guest_desc, | |
user_specs, | |
sponsor_content, | |
custom_bg_music_path | |
) | |
progress_bar.progress(100) | |
progress_text.write("β Done!") | |
if audio_bytes is None: | |
st.error(transcript) | |
st.session_state["audio_bytes"] = None | |
st.session_state["transcript"] = None | |
st.session_state["transcript_original"] = None | |
else: | |
st.success("Podcast generated successfully!") | |
st.session_state["audio_bytes"] = audio_bytes | |
st.session_state["transcript"] = transcript | |
st.session_state["transcript_original"] = transcript | |
if st.session_state["audio_bytes"]: | |
st.audio(st.session_state["audio_bytes"], format='audio/mp3') | |
st.download_button( | |
label="Download Podcast (MP3)", | |
data=st.session_state["audio_bytes"], | |
file_name="my_podcast.mp3", | |
mime="audio/mpeg" | |
) | |
st.markdown("### Generated Transcript (Editable)") | |
edited_text = st.text_area( | |
"Feel free to tweak lines, fix errors, or reword anything.", | |
value=st.session_state["transcript"], | |
height=300 | |
) | |
if st.session_state["transcript_original"]: | |
highlighted = highlight_differences( | |
st.session_state["transcript_original"], | |
edited_text | |
) | |
st.markdown("### **Edited Transcript Highlights**", unsafe_allow_html=True) | |
st.markdown(highlighted, unsafe_allow_html=True) | |
if st.button("Regenerate Audio From Edited Text"): | |
regen_bar = st.progress(0) | |
regen_text = st.empty() | |
regen_text.write("π Regenerating your podcast with the edits...") | |
regen_bar.progress(25) | |
time.sleep(1.0) | |
regen_text.write("π§ Adjusting the script based on your changes...") | |
regen_bar.progress(50) | |
time.sleep(1.0) | |
# Parse lines, map to DialogueItem with correct TTS speaker | |
# host => female (Jane), guest => male (John) | |
dialogue_items = parse_user_edited_transcript(edited_text, host_name or "Jane", guest_name or "John") | |
new_audio_bytes, new_transcript = regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path) | |
regen_bar.progress(75) | |
time.sleep(1.0) | |
if new_audio_bytes is None: | |
regen_bar.progress(100) | |
st.error(new_transcript) | |
else: | |
regen_bar.progress(100) | |
regen_text.write("β Regeneration complete!") | |
st.success("Regenerated audio below:") | |
st.session_state["audio_bytes"] = new_audio_bytes | |
st.session_state["transcript"] = new_transcript | |
st.session_state["transcript_original"] = new_transcript | |
st.audio(new_audio_bytes, format='audio/mp3') | |
st.download_button( | |
label="Download Edited Podcast (MP3)", | |
data=new_audio_bytes, | |
file_name="my_podcast_edited.mp3", | |
mime="audio/mpeg" | |
) | |
st.markdown("### Updated Transcript") | |
st.markdown(new_transcript) | |
if __name__ == "__main__": | |
main() | |