Spaces:
Running
Running
Update utils.py
Browse files
utils.py
CHANGED
@@ -18,14 +18,14 @@ import torch
|
|
18 |
import random
|
19 |
|
20 |
class DialogueItem(BaseModel):
|
21 |
-
speaker: Literal["Jane", "John"] #
|
22 |
display_speaker: str = "Jane" # For display in transcript
|
23 |
text: str
|
24 |
|
25 |
class Dialogue(BaseModel):
|
26 |
dialogue: List[DialogueItem]
|
27 |
|
28 |
-
# Initialize Whisper
|
29 |
asr_pipeline = pipeline(
|
30 |
"automatic-speech-recognition",
|
31 |
model="openai/whisper-tiny.en",
|
@@ -33,10 +33,6 @@ asr_pipeline = pipeline(
|
|
33 |
)
|
34 |
|
35 |
def truncate_text(text, max_tokens=2048):
|
36 |
-
"""
|
37 |
-
If the text exceeds the max token limit (approx. 2,048), truncate it
|
38 |
-
to avoid exceeding the model's context window.
|
39 |
-
"""
|
40 |
print("[LOG] Truncating text if needed.")
|
41 |
tokenizer = tiktoken.get_encoding("cl100k_base")
|
42 |
tokens = tokenizer.encode(text)
|
@@ -46,10 +42,6 @@ def truncate_text(text, max_tokens=2048):
|
|
46 |
return text
|
47 |
|
48 |
def extract_text_from_url(url):
|
49 |
-
"""
|
50 |
-
Fetches and extracts readable text from a given URL
|
51 |
-
(stripping out scripts, styles, etc.).
|
52 |
-
"""
|
53 |
print("[LOG] Extracting text from URL:", url)
|
54 |
try:
|
55 |
headers = {
|
@@ -74,29 +66,17 @@ def extract_text_from_url(url):
|
|
74 |
return ""
|
75 |
|
76 |
def pitch_shift(audio: AudioSegment, semitones: int) -> AudioSegment:
|
77 |
-
"""
|
78 |
-
Shifts the pitch of an AudioSegment by a given number of semitones.
|
79 |
-
Positive semitones shift the pitch up, negative shifts it down.
|
80 |
-
"""
|
81 |
print(f"[LOG] Shifting pitch by {semitones} semitones.")
|
82 |
new_sample_rate = int(audio.frame_rate * (2.0 ** (semitones / 12.0)))
|
83 |
shifted_audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate})
|
84 |
return shifted_audio.set_frame_rate(audio.frame_rate)
|
85 |
|
86 |
def is_sufficient(text: str, min_word_count: int = 500) -> bool:
|
87 |
-
"""
|
88 |
-
Checks if the fetched text meets our sufficiency criteria
|
89 |
-
(e.g., at least 500 words).
|
90 |
-
"""
|
91 |
word_count = len(text.split())
|
92 |
print(f"[DEBUG] Aggregated word count: {word_count}")
|
93 |
return word_count >= min_word_count
|
94 |
|
95 |
def query_llm_for_additional_info(topic: str, existing_text: str) -> str:
|
96 |
-
"""
|
97 |
-
Queries the Groq API to retrieve more info from the LLM's knowledge base.
|
98 |
-
Appends it to our aggregated info if found.
|
99 |
-
"""
|
100 |
print("[LOG] Querying LLM for additional information.")
|
101 |
system_prompt = (
|
102 |
"You are an AI assistant with extensive knowledge up to 2023-10. "
|
@@ -122,10 +102,6 @@ def query_llm_for_additional_info(topic: str, existing_text: str) -> str:
|
|
122 |
return additional_info
|
123 |
|
124 |
def research_topic(topic: str) -> str:
|
125 |
-
"""
|
126 |
-
Gathers info from various RSS feeds and Wikipedia. If needed, queries the LLM
|
127 |
-
for more data if the aggregated text is insufficient.
|
128 |
-
"""
|
129 |
sources = {
|
130 |
"BBC": "https://feeds.bbci.co.uk/news/rss.xml",
|
131 |
"CNN": "http://rss.cnn.com/rss/edition.rss",
|
@@ -136,15 +112,12 @@ def research_topic(topic: str) -> str:
|
|
136 |
"Economic Times": "https://economictimes.indiatimes.com/rssfeeds/1977021501.cms",
|
137 |
"Google News - Custom": f"https://news.google.com/rss/search?q={requests.utils.quote(topic)}&hl=en-IN&gl=IN&ceid=IN:en",
|
138 |
}
|
139 |
-
|
140 |
summary_parts = []
|
141 |
|
142 |
-
# Wikipedia summary
|
143 |
wiki_summary = fetch_wikipedia_summary(topic)
|
144 |
if wiki_summary:
|
145 |
summary_parts.append(f"From Wikipedia: {wiki_summary}")
|
146 |
|
147 |
-
# For each RSS feed
|
148 |
for name, feed_url in sources.items():
|
149 |
try:
|
150 |
items = fetch_rss_feed(feed_url)
|
@@ -165,7 +138,6 @@ def research_topic(topic: str) -> str:
|
|
165 |
print("[DEBUG] Aggregated info from primary sources:")
|
166 |
print(aggregated_info)
|
167 |
|
168 |
-
# Fallback to LLM if insufficient
|
169 |
if not is_sufficient(aggregated_info):
|
170 |
print("[LOG] Insufficient info from primary sources. Fallback to LLM.")
|
171 |
additional_info = query_llm_for_additional_info(topic, aggregated_info)
|
@@ -180,9 +152,6 @@ def research_topic(topic: str) -> str:
|
|
180 |
return aggregated_info
|
181 |
|
182 |
def fetch_wikipedia_summary(topic: str) -> str:
|
183 |
-
"""
|
184 |
-
Fetch a quick Wikipedia summary of the topic via the official Wikipedia API.
|
185 |
-
"""
|
186 |
print("[LOG] Fetching Wikipedia summary for:", topic)
|
187 |
try:
|
188 |
search_url = (
|
@@ -209,9 +178,6 @@ def fetch_wikipedia_summary(topic: str) -> str:
|
|
209 |
return ""
|
210 |
|
211 |
def fetch_rss_feed(feed_url: str) -> list:
|
212 |
-
"""
|
213 |
-
Pulls RSS feed data from a given URL and returns items.
|
214 |
-
"""
|
215 |
print("[LOG] Fetching RSS feed:", feed_url)
|
216 |
try:
|
217 |
resp = requests.get(feed_url)
|
@@ -226,10 +192,6 @@ def fetch_rss_feed(feed_url: str) -> list:
|
|
226 |
return []
|
227 |
|
228 |
def find_relevant_article(items, topic: str, min_match=2) -> tuple:
|
229 |
-
"""
|
230 |
-
Check each article in the RSS feed for mention of the topic
|
231 |
-
by counting the number of keyword matches.
|
232 |
-
"""
|
233 |
print("[LOG] Finding relevant articles...")
|
234 |
keywords = re.findall(r'\w+', topic.lower())
|
235 |
for item in items:
|
@@ -244,9 +206,6 @@ def find_relevant_article(items, topic: str, min_match=2) -> tuple:
|
|
244 |
return None, None, None
|
245 |
|
246 |
def fetch_article_text(link: str) -> str:
|
247 |
-
"""
|
248 |
-
Fetch the article text from the given link (first 5 paragraphs).
|
249 |
-
"""
|
250 |
print("[LOG] Fetching article text from:", link)
|
251 |
if not link:
|
252 |
print("[LOG] No link provided for article text.")
|
@@ -274,17 +233,9 @@ def generate_script(
|
|
274 |
guest_name: str = "John",
|
275 |
sponsor_style: str = "Separate Break"
|
276 |
):
|
277 |
-
"""
|
278 |
-
Sends the system_prompt plus input_text to the Groq LLM to generate a
|
279 |
-
multi-speaker Dialogue in JSON, returning a Dialogue object.
|
280 |
-
|
281 |
-
sponsor_style can be "Separate Break" or "Blended".
|
282 |
-
We add instructions telling the model how to integrate the sponsor content.
|
283 |
-
"""
|
284 |
print("[LOG] Generating script with tone:", tone, "and length:", target_length)
|
285 |
groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
|
286 |
|
287 |
-
# Parse numeric minutes
|
288 |
words_per_minute = 150
|
289 |
numeric_minutes = 3
|
290 |
match = re.search(r"(\d+)", target_length)
|
@@ -302,13 +253,12 @@ def generate_script(
|
|
302 |
}
|
303 |
chosen_tone = tone_map.get(tone, "casual")
|
304 |
|
305 |
-
# Sponsor instructions
|
306 |
if sponsor_style == "Separate Break":
|
307 |
sponsor_instructions = (
|
308 |
"If sponsor content is provided, include it in a separate ad break (~30 seconds). "
|
309 |
"Use phrasing like 'Now a word from our sponsor...' and end with 'Back to the show' or similar."
|
310 |
)
|
311 |
-
else:
|
312 |
sponsor_instructions = (
|
313 |
"If sponsor content is provided, blend it naturally (~30 seconds) into the conversation. "
|
314 |
"Avoid abrupt transitions."
|
@@ -334,7 +284,6 @@ def generate_script(
|
|
334 |
" ]\n"
|
335 |
"}"
|
336 |
)
|
337 |
-
|
338 |
print("[LOG] Sending prompt to Groq:")
|
339 |
print(prompt)
|
340 |
|
@@ -361,7 +310,6 @@ def generate_script(
|
|
361 |
data = json.loads(json_str)
|
362 |
dialogue_list = data.get("dialogue", [])
|
363 |
|
364 |
-
# Map raw speaker -> Jane or John, storing display_speaker
|
365 |
for d in dialogue_list:
|
366 |
raw_speaker = d.get("speaker", "Jane")
|
367 |
if raw_speaker.lower() == host_name.lower():
|
@@ -371,7 +319,7 @@ def generate_script(
|
|
371 |
d["speaker"] = "John"
|
372 |
d["display_speaker"] = guest_name
|
373 |
else:
|
374 |
-
d["speaker"] = "Jane"
|
375 |
d["display_speaker"] = raw_speaker
|
376 |
|
377 |
new_dialogue_items = []
|
@@ -389,17 +337,8 @@ def generate_script(
|
|
389 |
print("[ERROR] JSON decoding failed:", e)
|
390 |
raise ValueError(f"Failed to parse dialogue: {str(e)}")
|
391 |
|
392 |
-
|
393 |
-
# Replaces the old approach for YouTube with RapidAPI
|
394 |
-
# -------------------------------------------------------
|
395 |
def transcribe_youtube_video(video_url: str) -> str:
|
396 |
-
"""
|
397 |
-
Transcribe a YouTube video by calling the RapidAPI 'youtube-transcriptor' endpoint.
|
398 |
-
1) Extract the 11-char video ID from the YouTube URL.
|
399 |
-
2) Call the RapidAPI endpoint (lang=en).
|
400 |
-
3) Parse 'transcriptionAsText' from the response.
|
401 |
-
4) Return that transcript as a string.
|
402 |
-
"""
|
403 |
print("[LOG] Transcribing YouTube video via RapidAPI:", video_url)
|
404 |
video_id_match = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11})", video_url)
|
405 |
if not video_id_match:
|
@@ -436,10 +375,7 @@ def transcribe_youtube_video(video_url: str) -> str:
|
|
436 |
|
437 |
print("[LOG] Transcript retrieval successful.")
|
438 |
print(f"[DEBUG] Transcript Length: {len(transcript_as_text)} characters.")
|
439 |
-
if len(transcript_as_text) > 200
|
440 |
-
snippet = transcript_as_text[:200] + "..."
|
441 |
-
else:
|
442 |
-
snippet = transcript_as_text
|
443 |
print(f"[DEBUG] Transcript Snippet: {snippet}")
|
444 |
|
445 |
return transcript_as_text
|
@@ -451,20 +387,18 @@ def transcribe_youtube_video(video_url: str) -> str:
|
|
451 |
def generate_audio_mp3(text: str, speaker: str) -> str:
|
452 |
"""
|
453 |
Calls Deepgram TTS with the text, returning a path to a temp MP3 file.
|
454 |
-
We also do some pre-processing for punctuation, abbreviations, etc.
|
455 |
"""
|
456 |
try:
|
457 |
print(f"[LOG] Generating audio for speaker: {speaker}")
|
458 |
-
|
459 |
-
# Preprocess text for TTS
|
460 |
processed_text = _preprocess_text_for_tts(text, speaker)
|
461 |
|
462 |
deepgram_api_url = "https://api.deepgram.com/v1/speak"
|
463 |
params = {
|
464 |
-
"model": "aura-asteria-en", # female
|
465 |
}
|
466 |
if speaker == "John":
|
467 |
-
params["model"] = "aura-zeus-en"
|
468 |
|
469 |
headers = {
|
470 |
"Accept": "audio/mpeg",
|
@@ -489,7 +423,7 @@ def generate_audio_mp3(text: str, speaker: str) -> str:
|
|
489 |
mp3_file.write(chunk)
|
490 |
mp3_path = mp3_file.name
|
491 |
|
492 |
-
# Normalize
|
493 |
audio_seg = AudioSegment.from_file(mp3_path, format="mp3")
|
494 |
audio_seg = effects.normalize(audio_seg)
|
495 |
|
@@ -500,75 +434,69 @@ def generate_audio_mp3(text: str, speaker: str) -> str:
|
|
500 |
os.remove(mp3_path)
|
501 |
|
502 |
return final_mp3_path
|
503 |
-
|
504 |
except Exception as e:
|
505 |
print("[ERROR] Error generating audio:", e)
|
506 |
raise ValueError(f"Error generating audio: {str(e)}")
|
507 |
|
508 |
def transcribe_youtube_video_OLD_YTDLP(video_url: str) -> str:
|
509 |
-
"""
|
510 |
-
Original ytdlp-based approach for local transcription. No longer used.
|
511 |
-
"""
|
512 |
pass
|
513 |
|
514 |
def _preprocess_text_for_tts(text: str, speaker: str) -> str:
|
515 |
"""
|
516 |
-
|
517 |
-
|
518 |
-
|
519 |
-
|
520 |
-
|
521 |
-
|
522 |
-
|
523 |
-
|
524 |
"""
|
525 |
-
# 1)
|
526 |
-
# We'll do this first so we don't insert periods for S-A-A-S inadvertently.
|
527 |
text = re.sub(r"\b(?i)SaaS\b", "sass", text)
|
528 |
|
529 |
-
# 2) Insert periods
|
530 |
-
|
531 |
-
|
532 |
-
abbr = match.group(0) # e.g. "CIA"
|
533 |
-
# Insert a period after each character: "C.I.A."
|
534 |
parted = ".".join(list(abbr)) + "."
|
535 |
return parted
|
536 |
-
|
537 |
-
# Insert periods for 2+ uppercase letters or digits, ignoring 'sass' we already replaced
|
538 |
text = re.sub(r"\b([A-Z0-9]{2,})\b", insert_periods_for_abbrev, text)
|
|
|
539 |
|
540 |
-
#
|
541 |
-
|
542 |
-
|
543 |
-
# 3) Now remove those periods from TTS so it won't say 'dot'
|
544 |
-
# "A.I." -> "A I", "C.I.A." -> "C I A", "F.1." -> "F 1"
|
545 |
-
def remove_periods_for_tts(match):
|
546 |
-
chunk = match.group(0)
|
547 |
-
# e.g. "C.I.A." => remove '.' => "C I A "
|
548 |
-
# Then strip trailing space => "C I A"
|
549 |
return chunk.replace(".", " ").strip()
|
550 |
-
|
551 |
-
# Matches things like "A.I." or "C.I.A." or "F.1."
|
552 |
text = re.sub(r"[A-Z0-9]\.[A-Z0-9](?:\.[A-Z0-9])*\.", remove_periods_for_tts, text)
|
553 |
|
554 |
-
#
|
555 |
text = re.sub(r"-", " ", text)
|
556 |
|
557 |
-
#
|
558 |
def convert_decimal(m):
|
559 |
number_str = m.group()
|
560 |
parts = number_str.split('.')
|
561 |
whole_part = _spell_digits(parts[0])
|
562 |
decimal_part = " ".join(_spell_digits(d) for d in parts[1])
|
563 |
return f"{whole_part} point {decimal_part}"
|
564 |
-
text = re.sub(r"\d+\.\d
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
565 |
|
566 |
-
# 6) Expand leftover all-caps abbreviations
|
567 |
-
# e.g. NASA -> "N A S A", if not already dotted
|
568 |
def expand_abbreviations(m):
|
569 |
abbrev = m.group()
|
570 |
-
# If it's plural: e.g. "MPs" -> "M Peas"
|
571 |
if abbrev.endswith('s') and abbrev[:-1].isupper():
|
|
|
572 |
singular = abbrev[:-1]
|
573 |
expanded = " ".join(list(singular)) + "s"
|
574 |
special_plurals = {
|
@@ -579,10 +507,9 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
|
|
579 |
return special_plurals.get(abbrev, expanded)
|
580 |
else:
|
581 |
return " ".join(list(abbrev))
|
582 |
-
|
583 |
text = re.sub(r"\b[A-Z]{2,}s?\b", expand_abbreviations, text)
|
584 |
|
585 |
-
# 7)
|
586 |
if speaker != "Jane":
|
587 |
def insert_thinking_pause(m):
|
588 |
word = m.group(1)
|
@@ -591,7 +518,6 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
|
|
591 |
return f"{word}..., {filler}"
|
592 |
else:
|
593 |
return f"{word}...,"
|
594 |
-
|
595 |
keywords_pattern = r"\b(important|significant|crucial|point|topic)\b"
|
596 |
text = re.sub(keywords_pattern, insert_thinking_pause, text, flags=re.IGNORECASE)
|
597 |
|
@@ -609,31 +535,57 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
|
|
609 |
|
610 |
return text.strip()
|
611 |
|
612 |
-
def
|
613 |
"""
|
614 |
-
|
|
|
615 |
"""
|
616 |
-
|
617 |
-
|
618 |
-
|
619 |
-
|
620 |
-
|
621 |
-
|
622 |
-
|
623 |
-
|
624 |
-
|
625 |
-
|
626 |
-
|
627 |
-
|
628 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
629 |
|
630 |
def mix_with_bg_music(spoken: AudioSegment, custom_music_path=None) -> AudioSegment:
|
631 |
-
"""
|
632 |
-
Mixes 'spoken' with a default bg_music.mp3 or user-provided custom music:
|
633 |
-
1) Start with 2 seconds of music alone before speech begins.
|
634 |
-
2) Loop music if shorter than final audio length.
|
635 |
-
3) Lower music volume so speech is clear.
|
636 |
-
"""
|
637 |
if custom_music_path:
|
638 |
music_path = custom_music_path
|
639 |
else:
|
@@ -645,7 +597,6 @@ def mix_with_bg_music(spoken: AudioSegment, custom_music_path=None) -> AudioSegm
|
|
645 |
print("[ERROR] Failed to load background music:", e)
|
646 |
return spoken
|
647 |
|
648 |
-
# Lower music volume
|
649 |
bg_music = bg_music - 18.0
|
650 |
|
651 |
total_length_ms = len(spoken) + 2000
|
|
|
18 |
import random
|
19 |
|
20 |
class DialogueItem(BaseModel):
|
21 |
+
speaker: Literal["Jane", "John"] # TTS voice
|
22 |
display_speaker: str = "Jane" # For display in transcript
|
23 |
text: str
|
24 |
|
25 |
class Dialogue(BaseModel):
|
26 |
dialogue: List[DialogueItem]
|
27 |
|
28 |
+
# Initialize Whisper (unused for YouTube with RapidAPI)
|
29 |
asr_pipeline = pipeline(
|
30 |
"automatic-speech-recognition",
|
31 |
model="openai/whisper-tiny.en",
|
|
|
33 |
)
|
34 |
|
35 |
def truncate_text(text, max_tokens=2048):
|
|
|
|
|
|
|
|
|
36 |
print("[LOG] Truncating text if needed.")
|
37 |
tokenizer = tiktoken.get_encoding("cl100k_base")
|
38 |
tokens = tokenizer.encode(text)
|
|
|
42 |
return text
|
43 |
|
44 |
def extract_text_from_url(url):
|
|
|
|
|
|
|
|
|
45 |
print("[LOG] Extracting text from URL:", url)
|
46 |
try:
|
47 |
headers = {
|
|
|
66 |
return ""
|
67 |
|
68 |
def pitch_shift(audio: AudioSegment, semitones: int) -> AudioSegment:
|
|
|
|
|
|
|
|
|
69 |
print(f"[LOG] Shifting pitch by {semitones} semitones.")
|
70 |
new_sample_rate = int(audio.frame_rate * (2.0 ** (semitones / 12.0)))
|
71 |
shifted_audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate})
|
72 |
return shifted_audio.set_frame_rate(audio.frame_rate)
|
73 |
|
74 |
def is_sufficient(text: str, min_word_count: int = 500) -> bool:
|
|
|
|
|
|
|
|
|
75 |
word_count = len(text.split())
|
76 |
print(f"[DEBUG] Aggregated word count: {word_count}")
|
77 |
return word_count >= min_word_count
|
78 |
|
79 |
def query_llm_for_additional_info(topic: str, existing_text: str) -> str:
|
|
|
|
|
|
|
|
|
80 |
print("[LOG] Querying LLM for additional information.")
|
81 |
system_prompt = (
|
82 |
"You are an AI assistant with extensive knowledge up to 2023-10. "
|
|
|
102 |
return additional_info
|
103 |
|
104 |
def research_topic(topic: str) -> str:
|
|
|
|
|
|
|
|
|
105 |
sources = {
|
106 |
"BBC": "https://feeds.bbci.co.uk/news/rss.xml",
|
107 |
"CNN": "http://rss.cnn.com/rss/edition.rss",
|
|
|
112 |
"Economic Times": "https://economictimes.indiatimes.com/rssfeeds/1977021501.cms",
|
113 |
"Google News - Custom": f"https://news.google.com/rss/search?q={requests.utils.quote(topic)}&hl=en-IN&gl=IN&ceid=IN:en",
|
114 |
}
|
|
|
115 |
summary_parts = []
|
116 |
|
|
|
117 |
wiki_summary = fetch_wikipedia_summary(topic)
|
118 |
if wiki_summary:
|
119 |
summary_parts.append(f"From Wikipedia: {wiki_summary}")
|
120 |
|
|
|
121 |
for name, feed_url in sources.items():
|
122 |
try:
|
123 |
items = fetch_rss_feed(feed_url)
|
|
|
138 |
print("[DEBUG] Aggregated info from primary sources:")
|
139 |
print(aggregated_info)
|
140 |
|
|
|
141 |
if not is_sufficient(aggregated_info):
|
142 |
print("[LOG] Insufficient info from primary sources. Fallback to LLM.")
|
143 |
additional_info = query_llm_for_additional_info(topic, aggregated_info)
|
|
|
152 |
return aggregated_info
|
153 |
|
154 |
def fetch_wikipedia_summary(topic: str) -> str:
|
|
|
|
|
|
|
155 |
print("[LOG] Fetching Wikipedia summary for:", topic)
|
156 |
try:
|
157 |
search_url = (
|
|
|
178 |
return ""
|
179 |
|
180 |
def fetch_rss_feed(feed_url: str) -> list:
|
|
|
|
|
|
|
181 |
print("[LOG] Fetching RSS feed:", feed_url)
|
182 |
try:
|
183 |
resp = requests.get(feed_url)
|
|
|
192 |
return []
|
193 |
|
194 |
def find_relevant_article(items, topic: str, min_match=2) -> tuple:
|
|
|
|
|
|
|
|
|
195 |
print("[LOG] Finding relevant articles...")
|
196 |
keywords = re.findall(r'\w+', topic.lower())
|
197 |
for item in items:
|
|
|
206 |
return None, None, None
|
207 |
|
208 |
def fetch_article_text(link: str) -> str:
|
|
|
|
|
|
|
209 |
print("[LOG] Fetching article text from:", link)
|
210 |
if not link:
|
211 |
print("[LOG] No link provided for article text.")
|
|
|
233 |
guest_name: str = "John",
|
234 |
sponsor_style: str = "Separate Break"
|
235 |
):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
236 |
print("[LOG] Generating script with tone:", tone, "and length:", target_length)
|
237 |
groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
|
238 |
|
|
|
239 |
words_per_minute = 150
|
240 |
numeric_minutes = 3
|
241 |
match = re.search(r"(\d+)", target_length)
|
|
|
253 |
}
|
254 |
chosen_tone = tone_map.get(tone, "casual")
|
255 |
|
|
|
256 |
if sponsor_style == "Separate Break":
|
257 |
sponsor_instructions = (
|
258 |
"If sponsor content is provided, include it in a separate ad break (~30 seconds). "
|
259 |
"Use phrasing like 'Now a word from our sponsor...' and end with 'Back to the show' or similar."
|
260 |
)
|
261 |
+
else: # Blended
|
262 |
sponsor_instructions = (
|
263 |
"If sponsor content is provided, blend it naturally (~30 seconds) into the conversation. "
|
264 |
"Avoid abrupt transitions."
|
|
|
284 |
" ]\n"
|
285 |
"}"
|
286 |
)
|
|
|
287 |
print("[LOG] Sending prompt to Groq:")
|
288 |
print(prompt)
|
289 |
|
|
|
310 |
data = json.loads(json_str)
|
311 |
dialogue_list = data.get("dialogue", [])
|
312 |
|
|
|
313 |
for d in dialogue_list:
|
314 |
raw_speaker = d.get("speaker", "Jane")
|
315 |
if raw_speaker.lower() == host_name.lower():
|
|
|
319 |
d["speaker"] = "John"
|
320 |
d["display_speaker"] = guest_name
|
321 |
else:
|
322 |
+
d["speaker"] = "Jane"
|
323 |
d["display_speaker"] = raw_speaker
|
324 |
|
325 |
new_dialogue_items = []
|
|
|
337 |
print("[ERROR] JSON decoding failed:", e)
|
338 |
raise ValueError(f"Failed to parse dialogue: {str(e)}")
|
339 |
|
340 |
+
|
|
|
|
|
341 |
def transcribe_youtube_video(video_url: str) -> str:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
342 |
print("[LOG] Transcribing YouTube video via RapidAPI:", video_url)
|
343 |
video_id_match = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11})", video_url)
|
344 |
if not video_id_match:
|
|
|
375 |
|
376 |
print("[LOG] Transcript retrieval successful.")
|
377 |
print(f"[DEBUG] Transcript Length: {len(transcript_as_text)} characters.")
|
378 |
+
snippet = transcript_as_text[:200] + "..." if len(transcript_as_text) > 200 else transcript_as_text
|
|
|
|
|
|
|
379 |
print(f"[DEBUG] Transcript Snippet: {snippet}")
|
380 |
|
381 |
return transcript_as_text
|
|
|
387 |
def generate_audio_mp3(text: str, speaker: str) -> str:
|
388 |
"""
|
389 |
Calls Deepgram TTS with the text, returning a path to a temp MP3 file.
|
390 |
+
We also do some pre-processing for punctuation, abbreviations, numeric expansions, etc.
|
391 |
"""
|
392 |
try:
|
393 |
print(f"[LOG] Generating audio for speaker: {speaker}")
|
|
|
|
|
394 |
processed_text = _preprocess_text_for_tts(text, speaker)
|
395 |
|
396 |
deepgram_api_url = "https://api.deepgram.com/v1/speak"
|
397 |
params = {
|
398 |
+
"model": "aura-asteria-en", # female by default
|
399 |
}
|
400 |
if speaker == "John":
|
401 |
+
params["model"] = "aura-zeus-en"
|
402 |
|
403 |
headers = {
|
404 |
"Accept": "audio/mpeg",
|
|
|
423 |
mp3_file.write(chunk)
|
424 |
mp3_path = mp3_file.name
|
425 |
|
426 |
+
# Normalize
|
427 |
audio_seg = AudioSegment.from_file(mp3_path, format="mp3")
|
428 |
audio_seg = effects.normalize(audio_seg)
|
429 |
|
|
|
434 |
os.remove(mp3_path)
|
435 |
|
436 |
return final_mp3_path
|
|
|
437 |
except Exception as e:
|
438 |
print("[ERROR] Error generating audio:", e)
|
439 |
raise ValueError(f"Error generating audio: {str(e)}")
|
440 |
|
441 |
def transcribe_youtube_video_OLD_YTDLP(video_url: str) -> str:
|
|
|
|
|
|
|
442 |
pass
|
443 |
|
444 |
def _preprocess_text_for_tts(text: str, speaker: str) -> str:
|
445 |
"""
|
446 |
+
1) "SaaS" => "sass"
|
447 |
+
2) Insert periods for uppercase abbreviations => remove them for TTS
|
448 |
+
3) Convert decimals "3.14" => "three point one four"
|
449 |
+
4) For pure integer numbers (e.g. "10", "2023") => "ten", "two thousand twenty three"
|
450 |
+
5) Expand leftover all-caps
|
451 |
+
6) Insert fillers if speaker != "Jane"
|
452 |
+
7) Remove random fillers
|
453 |
+
8) Capitalize sentence starts
|
454 |
"""
|
455 |
+
# 1) "SaaS" => "sass"
|
|
|
456 |
text = re.sub(r"\b(?i)SaaS\b", "sass", text)
|
457 |
|
458 |
+
# 2) Insert periods for uppercase abbreviations of length >=2 => e.g. "CIA" -> "C.I.A."
|
459 |
+
def insert_periods_for_abbrev(m):
|
460 |
+
abbr = m.group(0)
|
|
|
|
|
461 |
parted = ".".join(list(abbr)) + "."
|
462 |
return parted
|
|
|
|
|
463 |
text = re.sub(r"\b([A-Z0-9]{2,})\b", insert_periods_for_abbrev, text)
|
464 |
+
text = re.sub(r"\.\.", ".", text) # remove double-dots
|
465 |
|
466 |
+
# 2b) Then remove those periods => TTS won't say "dot"
|
467 |
+
def remove_periods_for_tts(m):
|
468 |
+
chunk = m.group(0)
|
|
|
|
|
|
|
|
|
|
|
|
|
469 |
return chunk.replace(".", " ").strip()
|
|
|
|
|
470 |
text = re.sub(r"[A-Z0-9]\.[A-Z0-9](?:\.[A-Z0-9])*\.", remove_periods_for_tts, text)
|
471 |
|
472 |
+
# 3) Hyphens -> spaces
|
473 |
text = re.sub(r"-", " ", text)
|
474 |
|
475 |
+
# 4) Convert decimals like "3.14" => "three point one four"
|
476 |
def convert_decimal(m):
|
477 |
number_str = m.group()
|
478 |
parts = number_str.split('.')
|
479 |
whole_part = _spell_digits(parts[0])
|
480 |
decimal_part = " ".join(_spell_digits(d) for d in parts[1])
|
481 |
return f"{whole_part} point {decimal_part}"
|
482 |
+
text = re.sub(r"\b\d+\.\d+\b", convert_decimal, text)
|
483 |
+
|
484 |
+
# 5) Convert pure integer numbers => e.g. "10" -> "ten", "42" -> "forty two"
|
485 |
+
# We'll do a quick function for small-ish integers (up to 9999 for demo).
|
486 |
+
def convert_int_to_words(m):
|
487 |
+
num_str = m.group()
|
488 |
+
# e.g. "10" => 10 => "ten"
|
489 |
+
# "2023" => "two thousand twenty three"
|
490 |
+
# For brevity, handle up to 99999 or so. Or you can import "num2words" for a robust approach.
|
491 |
+
return number_to_words(int(num_str))
|
492 |
+
|
493 |
+
text = re.sub(r"\b\d+\b", convert_int_to_words, text)
|
494 |
|
495 |
+
# 6) Expand leftover all-caps abbreviations => "NASA" => "N A S A"
|
|
|
496 |
def expand_abbreviations(m):
|
497 |
abbrev = m.group()
|
|
|
498 |
if abbrev.endswith('s') and abbrev[:-1].isupper():
|
499 |
+
# Plural e.g. "MPs" => "M Peas"
|
500 |
singular = abbrev[:-1]
|
501 |
expanded = " ".join(list(singular)) + "s"
|
502 |
special_plurals = {
|
|
|
507 |
return special_plurals.get(abbrev, expanded)
|
508 |
else:
|
509 |
return " ".join(list(abbrev))
|
|
|
510 |
text = re.sub(r"\b[A-Z]{2,}s?\b", expand_abbreviations, text)
|
511 |
|
512 |
+
# 7) If speaker != Jane, insert filler words around certain keywords
|
513 |
if speaker != "Jane":
|
514 |
def insert_thinking_pause(m):
|
515 |
word = m.group(1)
|
|
|
518 |
return f"{word}..., {filler}"
|
519 |
else:
|
520 |
return f"{word}...,"
|
|
|
521 |
keywords_pattern = r"\b(important|significant|crucial|point|topic)\b"
|
522 |
text = re.sub(keywords_pattern, insert_thinking_pause, text, flags=re.IGNORECASE)
|
523 |
|
|
|
535 |
|
536 |
return text.strip()
|
537 |
|
538 |
+
def number_to_words(n: int) -> str:
|
539 |
"""
|
540 |
+
Very simple function to convert integers up to 99999 into words for TTS.
|
541 |
+
If you want a robust approach, consider the 'num2words' library.
|
542 |
"""
|
543 |
+
if n == 0:
|
544 |
+
return "zero"
|
545 |
+
|
546 |
+
if n < 0:
|
547 |
+
return "minus " + number_to_words(abs(n))
|
548 |
+
|
549 |
+
# Basic chunking
|
550 |
+
ones = ["","one","two","three","four","five","six","seven","eight","nine"]
|
551 |
+
teens = ["ten","eleven","twelve","thirteen","fourteen","fifteen","sixteen","seventeen","eighteen","nineteen"]
|
552 |
+
tens_words = ["","","twenty","thirty","forty","fifty","sixty","seventy","eighty","ninety"]
|
553 |
+
|
554 |
+
words = []
|
555 |
+
def two_digit_word(x):
|
556 |
+
if x == 0:
|
557 |
+
return ""
|
558 |
+
if x < 10:
|
559 |
+
return ones[x]
|
560 |
+
if 10 <= x < 20:
|
561 |
+
return teens[x-10]
|
562 |
+
# 20+
|
563 |
+
tens_part = x // 10
|
564 |
+
ones_part = x % 10
|
565 |
+
return tens_words[tens_part] + (f" {ones[ones_part]}" if ones_part else "")
|
566 |
+
|
567 |
+
# Handle thousands
|
568 |
+
thousands = n // 1000
|
569 |
+
remainder = n % 1000
|
570 |
+
if thousands > 0:
|
571 |
+
words.append(two_digit_word(thousands))
|
572 |
+
words.append("thousand")
|
573 |
+
|
574 |
+
# Handle hundreds
|
575 |
+
hundreds = remainder // 100
|
576 |
+
last_two = remainder % 100
|
577 |
+
if hundreds > 0:
|
578 |
+
words.append(ones[hundreds])
|
579 |
+
words.append("hundred")
|
580 |
+
|
581 |
+
if last_two > 0:
|
582 |
+
if hundreds > 0 or thousands > 0:
|
583 |
+
words.append("and")
|
584 |
+
words.append(two_digit_word(last_two))
|
585 |
+
|
586 |
+
return " ".join(w for w in words if w).strip()
|
587 |
|
588 |
def mix_with_bg_music(spoken: AudioSegment, custom_music_path=None) -> AudioSegment:
|
|
|
|
|
|
|
|
|
|
|
|
|
589 |
if custom_music_path:
|
590 |
music_path = custom_music_path
|
591 |
else:
|
|
|
597 |
print("[ERROR] Failed to load background music:", e)
|
598 |
return spoken
|
599 |
|
|
|
600 |
bg_music = bg_music - 18.0
|
601 |
|
602 |
total_length_ms = len(spoken) + 2000
|