siddhartharyaai commited on
Commit
81ea4ea
·
verified ·
1 Parent(s): 945d767

Delete utils.py

Browse files
Files changed (1) hide show
  1. utils.py +0 -554
utils.py DELETED
@@ -1,554 +0,0 @@
1
- # utils.py
2
-
3
- import os
4
- import re
5
- import json
6
- import requests
7
- import tempfile
8
- from bs4 import BeautifulSoup
9
- from typing import List, Literal
10
- from pydantic import BaseModel
11
- from pydub import AudioSegment, effects
12
- from transformers import pipeline
13
- import yt_dlp
14
- import tiktoken
15
- from groq import Groq
16
- import numpy as np
17
- import torch
18
- import random
19
-
20
- class DialogueItem(BaseModel):
21
- speaker: Literal["Jane", "John"]
22
- text: str
23
-
24
- class Dialogue(BaseModel):
25
- dialogue: List[DialogueItem]
26
-
27
- # Initialize Whisper ASR pipeline (unused for YouTube now, but still available for local audio)
28
- asr_pipeline = pipeline(
29
- "automatic-speech-recognition",
30
- model="openai/whisper-tiny.en",
31
- device=0 if torch.cuda.is_available() else -1
32
- )
33
-
34
- def truncate_text(text, max_tokens=2048):
35
- """
36
- If the text exceeds the max token limit (approx. 2,048), truncate it
37
- to avoid exceeding the model's context window.
38
- """
39
- print("[LOG] Truncating text if needed.")
40
- tokenizer = tiktoken.get_encoding("cl100k_base")
41
- tokens = tokenizer.encode(text)
42
- if len(tokens) > max_tokens:
43
- print("[LOG] Text too long, truncating.")
44
- return tokenizer.decode(tokens[:max_tokens])
45
- return text
46
-
47
- def extract_text_from_url(url):
48
- """
49
- Fetches and extracts readable text from a given URL
50
- (stripping out scripts, styles, etc.).
51
- """
52
- print("[LOG] Extracting text from URL:", url)
53
- try:
54
- response = requests.get(url)
55
- if response.status_code != 200:
56
- print(f"[ERROR] Failed to fetch URL: {url} with status code {response.status_code}")
57
- return ""
58
- soup = BeautifulSoup(response.text, 'html.parser')
59
- for script in soup(["script", "style"]):
60
- script.decompose()
61
- text = soup.get_text(separator=' ')
62
- print("[LOG] Text extraction from URL successful.")
63
- return text
64
- except Exception as e:
65
- print(f"[ERROR] Exception during text extraction from URL: {e}")
66
- return ""
67
-
68
- def pitch_shift(audio: AudioSegment, semitones: int) -> AudioSegment:
69
- """
70
- Shifts the pitch of an AudioSegment by a given number of semitones.
71
- Positive semitones shift the pitch up, negative shifts it down.
72
- """
73
- print(f"[LOG] Shifting pitch by {semitones} semitones.")
74
- new_sample_rate = int(audio.frame_rate * (2.0 ** (semitones / 12.0)))
75
- shifted_audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate})
76
- return shifted_audio.set_frame_rate(audio.frame_rate)
77
-
78
- def is_sufficient(text: str, min_word_count: int = 500) -> bool:
79
- """
80
- Checks if the fetched text meets our sufficiency criteria
81
- (e.g., at least 500 words).
82
- """
83
- word_count = len(text.split())
84
- print(f"[DEBUG] Aggregated word count: {word_count}")
85
- return word_count >= min_word_count
86
-
87
- def query_llm_for_additional_info(topic: str, existing_text: str) -> str:
88
- """
89
- Queries the Groq API to retrieve more info from the LLM's knowledge base.
90
- Appends it to our aggregated info if found.
91
- """
92
- print("[LOG] Querying LLM for additional information.")
93
- system_prompt = (
94
- "You are an AI assistant with extensive knowledge up to 2023-10. "
95
- "Provide additional relevant information on the following topic based on your knowledge base.\n\n"
96
- f"Topic: {topic}\n\n"
97
- f"Existing Information: {existing_text}\n\n"
98
- "Please add more insightful details, facts, and perspectives to enhance the understanding of the topic."
99
- )
100
- groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
101
- try:
102
- response = groq_client.chat.completions.create(
103
- messages=[{"role": "system", "content": system_prompt}],
104
- model="llama-3.3-70b-versatile",
105
- max_tokens=1024,
106
- temperature=0.7
107
- )
108
- except Exception as e:
109
- print("[ERROR] Groq API error during fallback:", e)
110
- return ""
111
- additional_info = response.choices[0].message.content.strip()
112
- print("[DEBUG] Additional information from LLM:")
113
- print(additional_info)
114
- return additional_info
115
-
116
- def research_topic(topic: str) -> str:
117
- """
118
- Gathers info from various RSS feeds and Wikipedia. If needed, queries the LLM
119
- for more data if the aggregated text is insufficient.
120
- """
121
- sources = {
122
- "BBC": "https://feeds.bbci.co.uk/news/rss.xml",
123
- "CNN": "http://rss.cnn.com/rss/edition.rss",
124
- "Associated Press": "https://apnews.com/apf-topnews",
125
- "NDTV": "https://www.ndtv.com/rss/top-stories",
126
- "Times of India": "https://timesofindia.indiatimes.com/rssfeeds/296589292.cms",
127
- "The Hindu": "https://www.thehindu.com/news/national/kerala/rssfeed.xml",
128
- "Economic Times": "https://economictimes.indiatimes.com/rssfeeds/1977021501.cms",
129
- "Google News - Custom": f"https://news.google.com/rss/search?q={requests.utils.quote(topic)}&hl=en-IN&gl=IN&ceid=IN:en",
130
- }
131
-
132
- summary_parts = []
133
-
134
- # Wikipedia summary
135
- wiki_summary = fetch_wikipedia_summary(topic)
136
- if wiki_summary:
137
- summary_parts.append(f"From Wikipedia: {wiki_summary}")
138
-
139
- # For each RSS
140
- for name, url in sources.items():
141
- try:
142
- items = fetch_rss_feed(url)
143
- if not items:
144
- continue
145
- title, desc, link = find_relevant_article(items, topic, min_match=2)
146
- if link:
147
- article_text = fetch_article_text(link)
148
- if article_text:
149
- summary_parts.append(f"From {name}: {article_text}")
150
- else:
151
- summary_parts.append(f"From {name}: {title} - {desc}")
152
- except Exception as e:
153
- print(f"[ERROR] Error fetching from {name} RSS feed:", e)
154
- continue
155
-
156
- aggregated_info = " ".join(summary_parts)
157
- print("[DEBUG] Aggregated info from primary sources:")
158
- print(aggregated_info)
159
-
160
- # If not enough data, fallback to LLM
161
- if not is_sufficient(aggregated_info):
162
- print("[LOG] Insufficient info from primary sources. Fallback to LLM.")
163
- additional_info = query_llm_for_additional_info(topic, aggregated_info)
164
- if additional_info:
165
- aggregated_info += " " + additional_info
166
- else:
167
- print("[ERROR] Failed to retrieve additional info from LLM.")
168
-
169
- if not aggregated_info:
170
- return f"Sorry, I couldn't find recent information on '{topic}'."
171
-
172
- return aggregated_info
173
-
174
- def fetch_wikipedia_summary(topic: str) -> str:
175
- """
176
- Fetch a quick Wikipedia summary of the topic via the official Wikipedia API.
177
- """
178
- print("[LOG] Fetching Wikipedia summary for:", topic)
179
- try:
180
- search_url = (
181
- f"https://en.wikipedia.org/w/api.php?action=opensearch&search={requests.utils.quote(topic)}"
182
- "&limit=1&namespace=0&format=json"
183
- )
184
- resp = requests.get(search_url)
185
- if resp.status_code != 200:
186
- print(f"[ERROR] Failed to fetch Wikipedia search results for {topic}")
187
- return ""
188
- data = resp.json()
189
- if len(data) > 1 and data[1]:
190
- title = data[1][0]
191
- summary_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{requests.utils.quote(title)}"
192
- s_resp = requests.get(summary_url)
193
- if s_resp.status_code == 200:
194
- s_data = s_resp.json()
195
- if "extract" in s_data:
196
- print("[LOG] Wikipedia summary fetched successfully.")
197
- return s_data["extract"]
198
- return ""
199
- except Exception as e:
200
- print(f"[ERROR] Exception during Wikipedia summary fetch: {e}")
201
- return ""
202
-
203
- def fetch_rss_feed(feed_url: str) -> list:
204
- """
205
- Pulls RSS feed data from a given URL and returns items.
206
- """
207
- print("[LOG] Fetching RSS feed:", feed_url)
208
- try:
209
- resp = requests.get(feed_url)
210
- if resp.status_code != 200:
211
- print(f"[ERROR] Failed to fetch RSS feed: {feed_url}")
212
- return []
213
- soup = BeautifulSoup(resp.content, "xml")
214
- items = soup.find_all("item")
215
- return items
216
- except Exception as e:
217
- print(f"[ERROR] Exception fetching RSS feed {feed_url}: {e}")
218
- return []
219
-
220
- def find_relevant_article(items, topic: str, min_match=2) -> tuple:
221
- """
222
- Check each article in the RSS feed for mention of the topic
223
- by counting the number of keyword matches.
224
- """
225
- print("[LOG] Finding relevant articles...")
226
- keywords = re.findall(r'\w+', topic.lower())
227
- for item in items:
228
- title = item.find("title").get_text().strip() if item.find("title") else ""
229
- description = item.find("description").get_text().strip() if item.find("description") else ""
230
- text = (title + " " + description).lower()
231
- matches = sum(1 for kw in keywords if kw in text)
232
- if matches >= min_match:
233
- link = item.find("link").get_text().strip() if item.find("link") else ""
234
- print(f"[LOG] Relevant article found: {title}")
235
- return title, description, link
236
- return None, None, None
237
-
238
- def fetch_article_text(link: str) -> str:
239
- """
240
- Fetch the article text from the given link (first 5 paragraphs).
241
- """
242
- print("[LOG] Fetching article text from:", link)
243
- if not link:
244
- print("[LOG] No link provided for article text.")
245
- return ""
246
- try:
247
- resp = requests.get(link)
248
- if resp.status_code != 200:
249
- print(f"[ERROR] Failed to fetch article from {link}")
250
- return ""
251
- soup = BeautifulSoup(resp.text, 'html.parser')
252
- paragraphs = soup.find_all("p")
253
- text = " ".join(p.get_text() for p in paragraphs[:5]) # first 5 paragraphs
254
- print("[LOG] Article text fetched successfully.")
255
- return text.strip()
256
- except Exception as e:
257
- print(f"[ERROR] Error fetching article text: {e}")
258
- return ""
259
-
260
- def generate_script(system_prompt: str, input_text: str, tone: str, target_length: str):
261
- """
262
- Sends the system_prompt plus input_text to the Groq LLM to generate a
263
- multi-speaker Dialogue in JSON. We parse and return it as a Dialogue object.
264
- """
265
- print("[LOG] Generating script with tone:", tone, "and length:", target_length)
266
- groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
267
-
268
- # Map length string to word ranges
269
- length_mapping = {
270
- "1-3 Mins": (200, 450),
271
- "3-5 Mins": (450, 750),
272
- "5-10 Mins": (750, 1500),
273
- "10-20 Mins": (1500, 3000)
274
- }
275
- min_words, max_words = length_mapping.get(target_length, (200, 450))
276
-
277
- tone_description = {
278
- "Humorous": "funny and exciting, makes people chuckle",
279
- "Formal": "business-like, well-structured, professional",
280
- "Casual": "like a conversation between close friends, relaxed and informal",
281
- "Youthful": "like how teenagers might chat, energetic and lively"
282
- }
283
- chosen_tone = tone_description.get(tone, "casual")
284
-
285
- # Construct prompt
286
- prompt = (
287
- f"{system_prompt}\n"
288
- f"TONE: {chosen_tone}\n"
289
- f"TARGET LENGTH: {target_length} ({min_words}-{max_words} words)\n"
290
- f"INPUT TEXT: {input_text}\n\n"
291
- "Please provide the output in the following JSON format without any additional text:\n\n"
292
- "{\n"
293
- ' "dialogue": [\n'
294
- ' {\n'
295
- ' "speaker": "Jane",\n'
296
- ' "text": "..." \n'
297
- ' },\n'
298
- ' {\n'
299
- ' "speaker": "John",\n'
300
- ' "text": "..." \n'
301
- ' }\n'
302
- " ]\n"
303
- "}"
304
- )
305
- print("[LOG] Sending prompt to Groq:")
306
- print(prompt)
307
-
308
- try:
309
- response = groq_client.chat.completions.create(
310
- messages=[{"role": "system", "content": prompt}],
311
- model="llama-3.3-70b-versatile",
312
- max_tokens=2048,
313
- temperature=0.7
314
- )
315
- except Exception as e:
316
- print("[ERROR] Groq API error:", e)
317
- raise ValueError(f"Error communicating with Groq API: {str(e)}")
318
-
319
- raw_content = response.choices[0].message.content.strip()
320
- # Attempt to parse JSON
321
- start_index = raw_content.find('{')
322
- end_index = raw_content.rfind('}')
323
- if start_index == -1 or end_index == -1:
324
- raise ValueError("Failed to parse dialogue: No JSON found.")
325
-
326
- json_str = raw_content[start_index:end_index+1].strip()
327
- try:
328
- data = json.loads(json_str)
329
- return Dialogue(**data)
330
- except Exception as e:
331
- print("[ERROR] JSON decoding failed:", e)
332
- raise ValueError(f"Failed to parse dialogue: {str(e)}")
333
-
334
- # ----------------------------------------------------------------------
335
- # REPLACE the YTDLP-based approach with the RapidAPI approach
336
- # ----------------------------------------------------------------------
337
- def transcribe_youtube_video(video_url: str) -> str:
338
- """
339
- Transcribe the given YouTube video by calling the RapidAPI 'youtube-transcriptor' endpoint.
340
- 1) Extract the 11-char video ID from the YouTube URL.
341
- 2) Call the RapidAPI endpoint (lang=en).
342
- 3) Parse and extract 'transcriptionAsText' from the response.
343
- 4) Return that transcript as a string.
344
- """
345
- print("[LOG] Transcribing YouTube video via RapidAPI:", video_url)
346
- # Extract video ID
347
- video_id_match = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11})", video_url)
348
- if not video_id_match:
349
- raise ValueError(f"Invalid YouTube URL: {video_url}, cannot extract video ID.")
350
-
351
- video_id = video_id_match.group(1)
352
- print("[LOG] Extracted video ID:", video_id)
353
-
354
- base_url = "https://youtube-transcriptor.p.rapidapi.com/transcript"
355
- params = {
356
- "video_id": video_id,
357
- "lang": "en"
358
- }
359
- headers = {
360
- "x-rapidapi-host": "youtube-transcriptor.p.rapidapi.com",
361
- "x-rapidapi-key": os.environ.get("RAPIDAPI_KEY")
362
- }
363
-
364
- try:
365
- response = requests.get(base_url, headers=headers, params=params, timeout=30)
366
- print("[LOG] RapidAPI Response Status Code:", response.status_code)
367
- print("[LOG] RapidAPI Response Body:", response.text) # Log the full response
368
-
369
- if response.status_code != 200:
370
- raise ValueError(f"RapidAPI transcription error: {response.status_code}, {response.text}")
371
-
372
- data = response.json()
373
- if not isinstance(data, list) or not data:
374
- raise ValueError(f"Unexpected transcript format or empty transcript: {data}")
375
-
376
- # Extract 'transcriptionAsText'
377
- transcript_as_text = data[0].get('transcriptionAsText', '').strip()
378
- if not transcript_as_text:
379
- raise ValueError("transcriptionAsText field is missing or empty.")
380
-
381
- print("[LOG] Transcript retrieval successful.")
382
- print(f"[DEBUG] Transcript Length: {len(transcript_as_text)} characters.")
383
-
384
- # Optionally, print a snippet of the transcript
385
- if len(transcript_as_text) > 200:
386
- snippet = transcript_as_text[:200] + "..."
387
- else:
388
- snippet = transcript_as_text
389
- print(f"[DEBUG] Transcript Snippet: {snippet}")
390
-
391
- return transcript_as_text
392
-
393
- except Exception as e:
394
- print("[ERROR] RapidAPI transcription error:", e)
395
- raise ValueError(f"Error transcribing YouTube video via RapidAPI: {str(e)}")
396
-
397
- def generate_audio_mp3(text: str, speaker: str) -> str:
398
- """
399
- Calls Deepgram TTS with the text, returning a path to a temp MP3 file.
400
- We also do some pre-processing for punctuation, abbreviations, etc.
401
- """
402
- try:
403
- print(f"[LOG] Generating audio for speaker: {speaker}")
404
-
405
- # Preprocess text
406
- processed_text = _preprocess_text_for_tts(text)
407
-
408
- # Deepgram TTS endpoint
409
- deepgram_api_url = "https://api.deepgram.com/v1/speak"
410
- params = {
411
- "model": "aura-asteria-en", # default
412
- }
413
- if speaker == "John":
414
- params["model"] = "aura-zeus-en"
415
-
416
- headers = {
417
- "Accept": "audio/mpeg",
418
- "Content-Type": "application/json",
419
- "Authorization": f"Token {os.environ.get('DEEPGRAM_API_KEY')}"
420
- }
421
- body = {
422
- "text": processed_text
423
- }
424
-
425
- response = requests.post(deepgram_api_url, params=params, headers=headers, json=body, stream=True)
426
- if response.status_code != 200:
427
- raise ValueError(f"Deepgram TTS error: {response.status_code}, {response.text}")
428
-
429
- content_type = response.headers.get('Content-Type', '')
430
- if 'audio/mpeg' not in content_type:
431
- raise ValueError("Unexpected Content-Type from Deepgram.")
432
-
433
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as mp3_file:
434
- for chunk in response.iter_content(chunk_size=8192):
435
- if chunk:
436
- mp3_file.write(chunk)
437
- mp3_path = mp3_file.name
438
-
439
- # Normalize volume
440
- audio_seg = AudioSegment.from_file(mp3_path, format="mp3")
441
- audio_seg = effects.normalize(audio_seg)
442
-
443
- final_mp3_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3").name
444
- audio_seg.export(final_mp3_path, format="mp3")
445
-
446
- if os.path.exists(mp3_path):
447
- os.remove(mp3_path)
448
-
449
- return final_mp3_path
450
- except Exception as e:
451
- print("[ERROR] Error generating audio:", e)
452
- raise ValueError(f"Error generating audio: {str(e)}")
453
-
454
- def transcribe_youtube_video_OLD_YTDLP(video_url: str) -> str:
455
- """
456
- Original ytdlp-based approach for local transcription.
457
- No longer used, but kept for reference.
458
- """
459
- pass
460
-
461
- # ---------------------------------------------------------------------
462
- # TEXT PRE-PROCESSING FOR NATURAL TTS (punctuation, abbreviations, etc.)
463
- # ---------------------------------------------------------------------
464
- def _preprocess_text_for_tts(text: str) -> str:
465
- """
466
- Enhances text for natural-sounding TTS by handling abbreviations,
467
- punctuation, and intelligent filler insertion.
468
- """
469
- # 1) Hyphens -> spaces
470
- text = re.sub(r"-", " ", text)
471
-
472
- # 2) Convert decimals (e.g., 3.14 -> 'three point one four')
473
- def convert_decimal(m):
474
- number_str = m.group()
475
- parts = number_str.split('.')
476
- whole_part = _spell_digits(parts[0])
477
- decimal_part = " ".join(_spell_digits(d) for d in parts[1])
478
- return f"{whole_part} point {decimal_part}"
479
-
480
- text = re.sub(r"\d+\.\d+", convert_decimal, text)
481
-
482
- # 3) Abbreviations (e.g., NASA -> N A S A, MPs -> M Peas)
483
- def expand_abbreviations(match):
484
- abbrev = match.group()
485
- # Check if it's a plural abbreviation
486
- if abbrev.endswith('s') and abbrev[:-1].isupper():
487
- singular = abbrev[:-1]
488
- expanded = " ".join(list(singular)) + "s" # Append 's' to the expanded form
489
- # Handle specific plural forms
490
- specific_plural = {
491
- "MPs": "M Peas",
492
- "TMTs": "T M Tees",
493
- "ARJs": "A R Jays",
494
- # Add more as needed
495
- }
496
- return specific_plural.get(abbrev, expanded)
497
- else:
498
- return " ".join(list(abbrev))
499
-
500
- # Regex to match abbreviations (all uppercase letters, possibly ending with 's')
501
- text = re.sub(r"\b[A-Z]{2,}s?\b", expand_abbreviations, text)
502
-
503
- # 4) Removed ellipsis insertion after punctuation to reduce long pauses
504
- # These lines have been removed:
505
- # text = re.sub(r"\.(\s|$)", r"...\1", text)
506
- # text = re.sub(r",(\s|$)", r",...\1", text)
507
- # text = re.sub(r"\?(\s|$)", r"?...\1", text)
508
-
509
- # 5) Intelligent filler insertion after specific keywords
510
- def insert_thinking_pause(m):
511
- word = m.group(1)
512
- # Decide randomly whether to insert a filler
513
- if random.random() < 0.3: # 30% chance
514
- filler = random.choice(['hmm,', 'well,', 'let me see,'])
515
- return f"{word}..., {filler}"
516
- else:
517
- return f"{word}...,"
518
-
519
- keywords_pattern = r"\b(important|significant|crucial|point|topic)\b"
520
- text = re.sub(keywords_pattern, insert_thinking_pause, text, flags=re.IGNORECASE)
521
-
522
- # 6) Insert dynamic pauses within sentences (e.g., after conjunctions)
523
- # This adds natural pauses without overusing fillers
524
- conjunctions_pattern = r"\b(and|but|so|because|however)\b"
525
- text = re.sub(conjunctions_pattern, lambda m: f"{m.group()}...", text, flags=re.IGNORECASE)
526
-
527
- # 7) Remove any unintended random fillers (safeguard)
528
- text = re.sub(r"\b(uh|um|ah)\b", "", text, flags=re.IGNORECASE)
529
-
530
- # 8) Ensure normal grammar and speaking style
531
- def capitalize_match(match):
532
- return match.group().upper()
533
-
534
- text = re.sub(r'(^\s*\w)|([.!?]\s*\w)', capitalize_match, text)
535
-
536
- return text.strip()
537
-
538
- def _spell_digits(d: str) -> str:
539
- """
540
- Convert digits '3' -> 'three', etc.
541
- """
542
- digit_map = {
543
- '0': 'zero',
544
- '1': 'one',
545
- '2': 'two',
546
- '3': 'three',
547
- '4': 'four',
548
- '5': 'five',
549
- '6': 'six',
550
- '7': 'seven',
551
- '8': 'eight',
552
- '9': 'nine'
553
- }
554
- return " ".join(digit_map[ch] for ch in d if ch in digit_map)