siddhartharyaai commited on
Commit
aacfe72
·
verified ·
1 Parent(s): e4e7996

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +89 -35
utils.py CHANGED
@@ -33,6 +33,10 @@ asr_pipeline = pipeline(
33
  )
34
 
35
  def truncate_text(text, max_tokens=2048):
 
 
 
 
36
  print("[LOG] Truncating text if needed.")
37
  tokenizer = tiktoken.get_encoding("cl100k_base")
38
  tokens = tokenizer.encode(text)
@@ -42,6 +46,10 @@ def truncate_text(text, max_tokens=2048):
42
  return text
43
 
44
  def extract_text_from_url(url):
 
 
 
 
45
  print("[LOG] Extracting text from URL:", url)
46
  try:
47
  headers = {
@@ -66,17 +74,29 @@ def extract_text_from_url(url):
66
  return ""
67
 
68
  def pitch_shift(audio: AudioSegment, semitones: int) -> AudioSegment:
 
 
 
 
69
  print(f"[LOG] Shifting pitch by {semitones} semitones.")
70
  new_sample_rate = int(audio.frame_rate * (2.0 ** (semitones / 12.0)))
71
  shifted_audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate})
72
  return shifted_audio.set_frame_rate(audio.frame_rate)
73
 
74
  def is_sufficient(text: str, min_word_count: int = 500) -> bool:
 
 
 
 
75
  word_count = len(text.split())
76
  print(f"[DEBUG] Aggregated word count: {word_count}")
77
  return word_count >= min_word_count
78
 
79
  def query_llm_for_additional_info(topic: str, existing_text: str) -> str:
 
 
 
 
80
  print("[LOG] Querying LLM for additional information.")
81
  system_prompt = (
82
  "You are an AI assistant with extensive knowledge up to 2023-10. "
@@ -102,6 +122,10 @@ def query_llm_for_additional_info(topic: str, existing_text: str) -> str:
102
  return additional_info
103
 
104
  def research_topic(topic: str) -> str:
 
 
 
 
105
  sources = {
106
  "BBC": "https://feeds.bbci.co.uk/news/rss.xml",
107
  "CNN": "http://rss.cnn.com/rss/edition.rss",
@@ -112,12 +136,15 @@ def research_topic(topic: str) -> str:
112
  "Economic Times": "https://economictimes.indiatimes.com/rssfeeds/1977021501.cms",
113
  "Google News - Custom": f"https://news.google.com/rss/search?q={requests.utils.quote(topic)}&hl=en-IN&gl=IN&ceid=IN:en",
114
  }
 
115
  summary_parts = []
116
 
 
117
  wiki_summary = fetch_wikipedia_summary(topic)
118
  if wiki_summary:
119
  summary_parts.append(f"From Wikipedia: {wiki_summary}")
120
 
 
121
  for name, feed_url in sources.items():
122
  try:
123
  items = fetch_rss_feed(feed_url)
@@ -138,6 +165,7 @@ def research_topic(topic: str) -> str:
138
  print("[DEBUG] Aggregated info from primary sources:")
139
  print(aggregated_info)
140
 
 
141
  if not is_sufficient(aggregated_info):
142
  print("[LOG] Insufficient info from primary sources. Fallback to LLM.")
143
  additional_info = query_llm_for_additional_info(topic, aggregated_info)
@@ -152,6 +180,9 @@ def research_topic(topic: str) -> str:
152
  return aggregated_info
153
 
154
  def fetch_wikipedia_summary(topic: str) -> str:
 
 
 
155
  print("[LOG] Fetching Wikipedia summary for:", topic)
156
  try:
157
  search_url = (
@@ -178,6 +209,9 @@ def fetch_wikipedia_summary(topic: str) -> str:
178
  return ""
179
 
180
  def fetch_rss_feed(feed_url: str) -> list:
 
 
 
181
  print("[LOG] Fetching RSS feed:", feed_url)
182
  try:
183
  resp = requests.get(feed_url)
@@ -192,6 +226,10 @@ def fetch_rss_feed(feed_url: str) -> list:
192
  return []
193
 
194
  def find_relevant_article(items, topic: str, min_match=2) -> tuple:
 
 
 
 
195
  print("[LOG] Finding relevant articles...")
196
  keywords = re.findall(r'\w+', topic.lower())
197
  for item in items:
@@ -206,6 +244,9 @@ def find_relevant_article(items, topic: str, min_match=2) -> tuple:
206
  return None, None, None
207
 
208
  def fetch_article_text(link: str) -> str:
 
 
 
209
  print("[LOG] Fetching article text from:", link)
210
  if not link:
211
  print("[LOG] No link provided for article text.")
@@ -233,6 +274,13 @@ def generate_script(
233
  guest_name: str = "John",
234
  sponsor_style: str = "Separate Break"
235
  ):
 
 
 
 
 
 
 
236
  print("[LOG] Generating script with tone:", tone, "and length:", target_length)
237
  groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
238
 
@@ -387,7 +435,8 @@ def transcribe_youtube_video(video_url: str) -> str:
387
  def generate_audio_mp3(text: str, speaker: str) -> str:
388
  """
389
  Calls Deepgram TTS with the text, returning a path to a temp MP3 file.
390
- We also do some pre-processing for punctuation, abbreviations, numeric expansions, etc.
 
391
  """
392
  try:
393
  print(f"[LOG] Generating audio for speaker: {speaker}")
@@ -444,26 +493,25 @@ def transcribe_youtube_video_OLD_YTDLP(video_url: str) -> str:
444
  def _preprocess_text_for_tts(text: str, speaker: str) -> str:
445
  """
446
  1) "SaaS" => "sass"
447
- 2) Insert periods for uppercase abbreviations => remove them for TTS
448
- 3) Convert decimals "3.14" => "three point one four"
449
- 4) For pure integer numbers (e.g. "10", "2023") => "ten", "two thousand twenty three"
450
  5) Expand leftover all-caps
451
- 6) Insert fillers if speaker != "Jane"
452
- 7) Remove random fillers
453
- 8) Capitalize sentence starts
 
454
  """
455
  # 1) "SaaS" => "sass"
456
  text = re.sub(r"\b(?i)SaaS\b", "sass", text)
457
 
458
- # 2) Insert periods for uppercase abbreviations of length >=2 => e.g. "CIA" -> "C.I.A."
459
  def insert_periods_for_abbrev(m):
460
  abbr = m.group(0)
461
  parted = ".".join(list(abbr)) + "."
462
  return parted
463
  text = re.sub(r"\b([A-Z0-9]{2,})\b", insert_periods_for_abbrev, text)
464
- text = re.sub(r"\.\.", ".", text) # remove double-dots
465
-
466
- # 2b) Then remove those periods => TTS won't say "dot"
467
  def remove_periods_for_tts(m):
468
  chunk = m.group(0)
469
  return chunk.replace(".", " ").strip()
@@ -472,7 +520,7 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
472
  # 3) Hyphens -> spaces
473
  text = re.sub(r"-", " ", text)
474
 
475
- # 4) Convert decimals like "3.14" => "three point one four"
476
  def convert_decimal(m):
477
  number_str = m.group()
478
  parts = number_str.split('.')
@@ -481,22 +529,16 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
481
  return f"{whole_part} point {decimal_part}"
482
  text = re.sub(r"\b\d+\.\d+\b", convert_decimal, text)
483
 
484
- # 5) Convert pure integer numbers => e.g. "10" -> "ten", "42" -> "forty two"
485
- # We'll do a quick function for small-ish integers (up to 9999 for demo).
486
  def convert_int_to_words(m):
487
  num_str = m.group()
488
- # e.g. "10" => 10 => "ten"
489
- # "2023" => "two thousand twenty three"
490
- # For brevity, handle up to 99999 or so. Or you can import "num2words" for a robust approach.
491
  return number_to_words(int(num_str))
492
-
493
  text = re.sub(r"\b\d+\b", convert_int_to_words, text)
494
 
495
- # 6) Expand leftover all-caps abbreviations => "NASA" => "N A S A"
496
  def expand_abbreviations(m):
497
  abbrev = m.group()
498
  if abbrev.endswith('s') and abbrev[:-1].isupper():
499
- # Plural e.g. "MPs" => "M Peas"
500
  singular = abbrev[:-1]
501
  expanded = " ".join(list(singular)) + "s"
502
  special_plurals = {
@@ -509,7 +551,15 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
509
  return " ".join(list(abbrev))
510
  text = re.sub(r"\b[A-Z]{2,}s?\b", expand_abbreviations, text)
511
 
512
- # 7) If speaker != Jane, insert filler words around certain keywords
 
 
 
 
 
 
 
 
513
  if speaker != "Jane":
514
  def insert_thinking_pause(m):
515
  word = m.group(1)
@@ -521,14 +571,13 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
521
  keywords_pattern = r"\b(important|significant|crucial|point|topic)\b"
522
  text = re.sub(keywords_pattern, insert_thinking_pause, text, flags=re.IGNORECASE)
523
 
524
- # Insert dynamic pauses for certain conjunctions
525
  conj_pattern = r"\b(and|but|so|because|however)\b"
526
  text = re.sub(conj_pattern, lambda m: f"{m.group()}...", text, flags=re.IGNORECASE)
527
 
528
- # 8) Remove random fillers
529
  text = re.sub(r"\b(uh|um|ah)\b", "", text, flags=re.IGNORECASE)
530
 
531
- # 9) Capitalize sentence starts
532
  def capitalize_match(m):
533
  return m.group().upper()
534
  text = re.sub(r'(^\s*\w)|([.!?]\s*\w)', capitalize_match, text)
@@ -537,8 +586,8 @@ def _preprocess_text_for_tts(text: str, speaker: str) -> str:
537
 
538
  def number_to_words(n: int) -> str:
539
  """
540
- Very simple function to convert integers up to 99999 into words for TTS.
541
- If you want a robust approach, consider the 'num2words' library.
542
  """
543
  if n == 0:
544
  return "zero"
@@ -546,12 +595,14 @@ def number_to_words(n: int) -> str:
546
  if n < 0:
547
  return "minus " + number_to_words(abs(n))
548
 
549
- # Basic chunking
550
  ones = ["","one","two","three","four","five","six","seven","eight","nine"]
551
- teens = ["ten","eleven","twelve","thirteen","fourteen","fifteen","sixteen","seventeen","eighteen","nineteen"]
552
- tens_words = ["","","twenty","thirty","forty","fifty","sixty","seventy","eighty","ninety"]
 
 
553
 
554
  words = []
 
555
  def two_digit_word(x):
556
  if x == 0:
557
  return ""
@@ -559,19 +610,16 @@ def number_to_words(n: int) -> str:
559
  return ones[x]
560
  if 10 <= x < 20:
561
  return teens[x-10]
562
- # 20+
563
- tens_part = x // 10
564
- ones_part = x % 10
565
- return tens_words[tens_part] + (f" {ones[ones_part]}" if ones_part else "")
566
 
567
- # Handle thousands
568
  thousands = n // 1000
569
  remainder = n % 1000
570
  if thousands > 0:
571
  words.append(two_digit_word(thousands))
572
  words.append("thousand")
573
 
574
- # Handle hundreds
575
  hundreds = remainder // 100
576
  last_two = remainder % 100
577
  if hundreds > 0:
@@ -586,6 +634,12 @@ def number_to_words(n: int) -> str:
586
  return " ".join(w for w in words if w).strip()
587
 
588
  def mix_with_bg_music(spoken: AudioSegment, custom_music_path=None) -> AudioSegment:
 
 
 
 
 
 
589
  if custom_music_path:
590
  music_path = custom_music_path
591
  else:
 
33
  )
34
 
35
  def truncate_text(text, max_tokens=2048):
36
+ """
37
+ If the text exceeds the max token limit (approx. 2,048), truncate it
38
+ to avoid exceeding the model's context window.
39
+ """
40
  print("[LOG] Truncating text if needed.")
41
  tokenizer = tiktoken.get_encoding("cl100k_base")
42
  tokens = tokenizer.encode(text)
 
46
  return text
47
 
48
  def extract_text_from_url(url):
49
+ """
50
+ Fetches and extracts readable text from a given URL
51
+ (stripping out scripts, styles, etc.).
52
+ """
53
  print("[LOG] Extracting text from URL:", url)
54
  try:
55
  headers = {
 
74
  return ""
75
 
76
  def pitch_shift(audio: AudioSegment, semitones: int) -> AudioSegment:
77
+ """
78
+ Shifts the pitch of an AudioSegment by a given number of semitones.
79
+ Positive semitones shift the pitch up, negative shifts it down.
80
+ """
81
  print(f"[LOG] Shifting pitch by {semitones} semitones.")
82
  new_sample_rate = int(audio.frame_rate * (2.0 ** (semitones / 12.0)))
83
  shifted_audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate})
84
  return shifted_audio.set_frame_rate(audio.frame_rate)
85
 
86
  def is_sufficient(text: str, min_word_count: int = 500) -> bool:
87
+ """
88
+ Checks if the fetched text meets our sufficiency criteria
89
+ (e.g., at least 500 words).
90
+ """
91
  word_count = len(text.split())
92
  print(f"[DEBUG] Aggregated word count: {word_count}")
93
  return word_count >= min_word_count
94
 
95
  def query_llm_for_additional_info(topic: str, existing_text: str) -> str:
96
+ """
97
+ Queries the Groq API to retrieve more info from the LLM's knowledge base.
98
+ Appends it to our aggregated info if found.
99
+ """
100
  print("[LOG] Querying LLM for additional information.")
101
  system_prompt = (
102
  "You are an AI assistant with extensive knowledge up to 2023-10. "
 
122
  return additional_info
123
 
124
  def research_topic(topic: str) -> str:
125
+ """
126
+ Gathers info from various RSS feeds and Wikipedia. If needed, queries the LLM
127
+ for more data if the aggregated text is insufficient.
128
+ """
129
  sources = {
130
  "BBC": "https://feeds.bbci.co.uk/news/rss.xml",
131
  "CNN": "http://rss.cnn.com/rss/edition.rss",
 
136
  "Economic Times": "https://economictimes.indiatimes.com/rssfeeds/1977021501.cms",
137
  "Google News - Custom": f"https://news.google.com/rss/search?q={requests.utils.quote(topic)}&hl=en-IN&gl=IN&ceid=IN:en",
138
  }
139
+
140
  summary_parts = []
141
 
142
+ # Wikipedia summary
143
  wiki_summary = fetch_wikipedia_summary(topic)
144
  if wiki_summary:
145
  summary_parts.append(f"From Wikipedia: {wiki_summary}")
146
 
147
+ # For each RSS feed
148
  for name, feed_url in sources.items():
149
  try:
150
  items = fetch_rss_feed(feed_url)
 
165
  print("[DEBUG] Aggregated info from primary sources:")
166
  print(aggregated_info)
167
 
168
+ # Fallback to LLM if insufficient
169
  if not is_sufficient(aggregated_info):
170
  print("[LOG] Insufficient info from primary sources. Fallback to LLM.")
171
  additional_info = query_llm_for_additional_info(topic, aggregated_info)
 
180
  return aggregated_info
181
 
182
  def fetch_wikipedia_summary(topic: str) -> str:
183
+ """
184
+ Fetch a quick Wikipedia summary of the topic via the official Wikipedia API.
185
+ """
186
  print("[LOG] Fetching Wikipedia summary for:", topic)
187
  try:
188
  search_url = (
 
209
  return ""
210
 
211
  def fetch_rss_feed(feed_url: str) -> list:
212
+ """
213
+ Pulls RSS feed data from a given URL and returns items.
214
+ """
215
  print("[LOG] Fetching RSS feed:", feed_url)
216
  try:
217
  resp = requests.get(feed_url)
 
226
  return []
227
 
228
  def find_relevant_article(items, topic: str, min_match=2) -> tuple:
229
+ """
230
+ Check each article in the RSS feed for mention of the topic
231
+ by counting the number of keyword matches.
232
+ """
233
  print("[LOG] Finding relevant articles...")
234
  keywords = re.findall(r'\w+', topic.lower())
235
  for item in items:
 
244
  return None, None, None
245
 
246
  def fetch_article_text(link: str) -> str:
247
+ """
248
+ Fetch the article text from the given link (first 5 paragraphs).
249
+ """
250
  print("[LOG] Fetching article text from:", link)
251
  if not link:
252
  print("[LOG] No link provided for article text.")
 
274
  guest_name: str = "John",
275
  sponsor_style: str = "Separate Break"
276
  ):
277
+ """
278
+ Sends the system_prompt plus input_text to the Groq LLM to generate a
279
+ multi-speaker Dialogue in JSON, returning a Dialogue object.
280
+
281
+ sponsor_style can be "Separate Break" or "Blended".
282
+ We add instructions telling the model how to integrate the sponsor content.
283
+ """
284
  print("[LOG] Generating script with tone:", tone, "and length:", target_length)
285
  groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
286
 
 
435
  def generate_audio_mp3(text: str, speaker: str) -> str:
436
  """
437
  Calls Deepgram TTS with the text, returning a path to a temp MP3 file.
438
+ We also do some pre-processing for punctuation, abbreviations, numeric expansions,
439
+ plus emotive expressions (ha, sigh, etc.).
440
  """
441
  try:
442
  print(f"[LOG] Generating audio for speaker: {speaker}")
 
493
  def _preprocess_text_for_tts(text: str, speaker: str) -> str:
494
  """
495
  1) "SaaS" => "sass"
496
+ 2) Insert periods in uppercase abbreviations -> remove for TTS
497
+ 3) Convert decimals like "3.14" -> "three point one four"
498
+ 4) Convert pure integer numbers like "20" -> "twenty"
499
  5) Expand leftover all-caps
500
+ 6) Emotive placeholders for 'ha', 'haha', 'sigh', 'groan', etc.
501
+ 7) If speaker != Jane, insert filler words
502
+ 8) Remove random fillers
503
+ 9) Capitalize sentence starts
504
  """
505
  # 1) "SaaS" => "sass"
506
  text = re.sub(r"\b(?i)SaaS\b", "sass", text)
507
 
508
+ # 2) Insert periods for uppercase abbreviations (>=2 chars), then remove them
509
  def insert_periods_for_abbrev(m):
510
  abbr = m.group(0)
511
  parted = ".".join(list(abbr)) + "."
512
  return parted
513
  text = re.sub(r"\b([A-Z0-9]{2,})\b", insert_periods_for_abbrev, text)
514
+ text = re.sub(r"\.\.", ".", text)
 
 
515
  def remove_periods_for_tts(m):
516
  chunk = m.group(0)
517
  return chunk.replace(".", " ").strip()
 
520
  # 3) Hyphens -> spaces
521
  text = re.sub(r"-", " ", text)
522
 
523
+ # 4) Convert decimals (e.g. "3.14")
524
  def convert_decimal(m):
525
  number_str = m.group()
526
  parts = number_str.split('.')
 
529
  return f"{whole_part} point {decimal_part}"
530
  text = re.sub(r"\b\d+\.\d+\b", convert_decimal, text)
531
 
532
+ # 5) Convert pure integer => words
 
533
  def convert_int_to_words(m):
534
  num_str = m.group()
 
 
 
535
  return number_to_words(int(num_str))
 
536
  text = re.sub(r"\b\d+\b", convert_int_to_words, text)
537
 
538
+ # 6) Expand leftover all-caps => "NASA" => "N A S A"
539
  def expand_abbreviations(m):
540
  abbrev = m.group()
541
  if abbrev.endswith('s') and abbrev[:-1].isupper():
 
542
  singular = abbrev[:-1]
543
  expanded = " ".join(list(singular)) + "s"
544
  special_plurals = {
 
551
  return " ".join(list(abbrev))
552
  text = re.sub(r"\b[A-Z]{2,}s?\b", expand_abbreviations, text)
553
 
554
+ # 7) Emotive placeholders
555
+ # "haha", "ha", "heh", "lol" => "(* laughs *)"
556
+ text = re.sub(r"\b(ha(ha)?|heh|lol)\b", "(* laughs *)", text, flags=re.IGNORECASE)
557
+ # "sigh" => "(* sighs *)"
558
+ text = re.sub(r"\bsigh\b", "(* sighs *)", text, flags=re.IGNORECASE)
559
+ # "groan", "moan" => "(* groans *)"
560
+ text = re.sub(r"\b(groan|moan)\b", "(* groans *)", text, flags=re.IGNORECASE)
561
+
562
+ # 8) Insert filler words if speaker != Jane
563
  if speaker != "Jane":
564
  def insert_thinking_pause(m):
565
  word = m.group(1)
 
571
  keywords_pattern = r"\b(important|significant|crucial|point|topic)\b"
572
  text = re.sub(keywords_pattern, insert_thinking_pause, text, flags=re.IGNORECASE)
573
 
 
574
  conj_pattern = r"\b(and|but|so|because|however)\b"
575
  text = re.sub(conj_pattern, lambda m: f"{m.group()}...", text, flags=re.IGNORECASE)
576
 
577
+ # 9) Remove random fillers
578
  text = re.sub(r"\b(uh|um|ah)\b", "", text, flags=re.IGNORECASE)
579
 
580
+ # 10) Capitalize sentence starts
581
  def capitalize_match(m):
582
  return m.group().upper()
583
  text = re.sub(r'(^\s*\w)|([.!?]\s*\w)', capitalize_match, text)
 
586
 
587
  def number_to_words(n: int) -> str:
588
  """
589
+ Basic integer-to-words up to ~99999.
590
+ For a robust approach, consider the 'num2words' library.
591
  """
592
  if n == 0:
593
  return "zero"
 
595
  if n < 0:
596
  return "minus " + number_to_words(abs(n))
597
 
 
598
  ones = ["","one","two","three","four","five","six","seven","eight","nine"]
599
+ teens = ["ten","eleven","twelve","thirteen","fourteen","fifteen",
600
+ "sixteen","seventeen","eighteen","nineteen"]
601
+ tens_words = ["","","twenty","thirty","forty","fifty",
602
+ "sixty","seventy","eighty","ninety"]
603
 
604
  words = []
605
+
606
  def two_digit_word(x):
607
  if x == 0:
608
  return ""
 
610
  return ones[x]
611
  if 10 <= x < 20:
612
  return teens[x-10]
613
+ t = x // 10
614
+ o = x % 10
615
+ return tens_words[t] + (f" {ones[o]}" if o else "")
 
616
 
 
617
  thousands = n // 1000
618
  remainder = n % 1000
619
  if thousands > 0:
620
  words.append(two_digit_word(thousands))
621
  words.append("thousand")
622
 
 
623
  hundreds = remainder // 100
624
  last_two = remainder % 100
625
  if hundreds > 0:
 
634
  return " ".join(w for w in words if w).strip()
635
 
636
  def mix_with_bg_music(spoken: AudioSegment, custom_music_path=None) -> AudioSegment:
637
+ """
638
+ Mixes 'spoken' with a default bg_music.mp3 or user-provided custom music:
639
+ 1) Start with 2 seconds of music alone before speech begins.
640
+ 2) Loop music if shorter than final audio length.
641
+ 3) Lower music volume so speech is clear.
642
+ """
643
  if custom_music_path:
644
  music_path = custom_music_path
645
  else: