Toumaima commited on
Commit
9f6cf12
·
verified ·
1 Parent(s): 704ac65

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +53 -190
app.py CHANGED
@@ -1,225 +1,88 @@
1
- import os
2
- import time
3
- import moviepy
4
- import requests
5
- import whisper
6
- import gradio as gr
7
- import pandas as pd
8
- from duckduckgo_search import DDGS
9
  from transformers import pipeline
 
10
  from sklearn.metrics.pairwise import cosine_similarity
11
  import numpy as np
12
- from bs4 import BeautifulSoup
 
13
 
14
- # --- Constants ---
15
- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
16
 
17
- # --- Basic Agent Definition ---
18
  class BasicAgent:
19
  def __init__(self):
20
  print("BasicAgent initialized.")
21
- # Initialize Whisper model for video transcription
22
- self.whisper_model = whisper.load_model("base") # You can change the model to `large`, `medium`, etc.
23
- self.search_pipeline = pipeline("question-answering")
24
- self.nlp_model = pipeline("feature-extraction") # For semantic similarity (using transformer model)
25
- self.ner_pipeline = pipeline("ner", grouped_entities=True)
26
 
27
- def extract_person_entities(self, text: str) -> list:
28
- # Extract named entities (persons) from the text
29
- entities = self.ner_pipeline(text[:1000])
30
- return [e['word'] for e in entities if e['entity_group'] == 'PER']
31
 
32
- def extract_wikipedia_nominator(self, search_results: list) -> str:
33
- # Check if search result contains Wikipedia nomination info
34
- for result in search_results:
35
- if "Wikipedia:Featured_article_candidates" in result.get('href', ''):
36
- try:
37
- response = requests.get(result['href'], timeout=10)
38
- soup = BeautifulSoup(response.text, 'html.parser')
39
- text = soup.get_text()
40
- for line in text.split("\n"):
41
- if "nominated by" in line.lower():
42
- persons = self.extract_person_entities(line)
43
- return f"Nominated by {persons[0]}" if persons else line.strip()
44
- except Exception:
45
- continue
46
- return None
47
 
48
- def score_search_results(self, question: str, search_results: list) -> str:
49
- # Calculate semantic similarity and score the search results
50
- question_embedding = np.mean(self.nlp_model(question)[0], axis=0)
51
- best_score = -1
52
- best_answer = None
53
- for result in search_results:
54
- result_embedding = np.mean(self.nlp_model(result['body'])[0], axis=0)
55
- similarity = cosine_similarity([question_embedding], [result_embedding])[0][0]
56
- if similarity > best_score:
57
- best_score = similarity
58
- best_answer = result['body']
59
- return best_answer or "No high-confidence answer found."
60
 
61
  def search(self, question: str) -> str:
62
  try:
63
  with DDGS() as ddgs:
64
- results = list(ddgs.text(question, max_results=5)) # Fetch top 5 results
65
  if not results:
66
  return "No relevant search results found."
67
-
68
- # If the question relates to Wikipedia Featured Article nomination, check for nomination
69
- if "featured article" in question.lower() and "wikipedia" in question.lower():
70
- nomination_info = self.extract_wikipedia_nominator(results)
71
- if nomination_info:
72
- return nomination_info
73
-
74
- # Otherwise, return the best search result based on semantic similarity
75
- return self.score_search_results(question, results)
76
  except Exception as e:
77
  return f"Search error: {e}"
78
 
79
- def call_whisper(self, video_path: str) -> str:
80
- # Transcribe video using Whisper
81
- video = moviepy.editor.VideoFileClip(video_path)
82
- audio_path = "temp_audio.wav"
83
- video.audio.write_audiofile(audio_path)
84
- result = self.whisper_model.transcribe(audio_path)
85
- return result["text"]
86
 
87
  def __call__(self, question: str, video_path: str = None) -> str:
88
- print(f"Agent received question (first 50 chars): {question[:50]}...")
 
89
  if video_path:
90
  transcription = self.call_whisper(video_path)
91
- print(f"Transcribed video text: {transcription[:100]}...")
92
  return transcription
93
 
94
- answer = self.search(question)
95
- print(f"Agent returning search result: {answer[:100]}...")
96
- time.sleep(2)
97
- return answer
98
-
99
 
100
- # --- Run and Submit All ---
101
- def run_and_submit_all(profile: gr.OAuthProfile | None):
102
- """
103
- Fetches all questions, runs the BasicAgent on them, submits all answers,
104
- and displays the results.
105
- """
106
- # --- Determine HF Space Runtime URL and Repo URL ---
107
- space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
108
-
109
- if profile:
110
- username = f"{profile.username}"
111
- print(f"User logged in: {username}")
112
- else:
113
- print("User not logged in.")
114
- return "Please Login to Hugging Face with the button.", None
115
 
116
- api_url = DEFAULT_API_URL
117
- questions_url = f"{api_url}/questions"
118
- submit_url = f"{api_url}/submit"
119
 
120
- # 1. Instantiate Agent
121
- try:
122
- agent = BasicAgent()
123
- except Exception as e:
124
- print(f"Error instantiating agent: {e}")
125
- return f"Error initializing agent: {e}", None
126
-
127
- agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
128
- print(agent_code)
129
 
130
- # 2. Fetch Questions
131
- print(f"Fetching questions from: {questions_url}")
132
- try:
133
- response = requests.get(questions_url, timeout=15)
134
- response.raise_for_status()
135
- questions_data = response.json()
136
- if not questions_data:
137
- print("Fetched questions list is empty.")
138
- return "Fetched questions list is empty or invalid format.", None
139
- print(f"Fetched {len(questions_data)} questions.")
140
- except requests.exceptions.RequestException as e:
141
- print(f"Error fetching questions: {e}")
142
- return f"Error fetching questions: {e}", None
143
- except requests.exceptions.JSONDecodeError as e:
144
- print(f"Error decoding JSON response from questions endpoint: {e}")
145
- return f"Error decoding server response for questions: {e}", None
146
- except Exception as e:
147
- print(f"An unexpected error occurred fetching questions: {e}")
148
- return f"An unexpected error occurred fetching questions: {e}", None
149
 
150
- # 3. Run Agent
151
- results_log = []
152
- answers_payload = []
153
- print(f"Running agent on {len(questions_data)} questions...")
154
- for item in questions_data:
155
- task_id = item.get("task_id")
156
- question_text = item.get("question")
157
- video_link = item.get("video_link") # Assuming the question contains an optional video link
158
 
159
- if not task_id or question_text is None:
160
- print(f"Skipping item with missing task_id or question: {item}")
161
- continue
162
-
163
- try:
164
- # Pass video_link if available, else just the question text
165
- submitted_answer = agent(question_text, video_path=video_link)
166
- answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
167
- results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
168
- except Exception as e:
169
- print(f"Error running agent on task {task_id}: {e}")
170
- results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
171
-
172
- if not answers_payload:
173
- print("Agent did not produce any answers to submit.")
174
- return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
175
-
176
- # 4. Prepare Submission
177
- submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
178
- status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
179
- print(status_update)
180
-
181
- # 5. Submit
182
- print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
183
- try:
184
- response = requests.post(submit_url, json=submission_data, timeout=60)
185
- response.raise_for_status()
186
- result_data = response.json()
187
- final_status = (
188
- f"Submission Successful!\n"
189
- f"User: {result_data.get('username')}\n"
190
- f"Overall Score: {result_data.get('score', 'N/A')}% "
191
- f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
192
- f"Message: {result_data.get('message', 'No message received.')}"
193
- )
194
- print("Submission successful.")
195
- results_df = pd.DataFrame(results_log)
196
- return final_status, results_df
197
- except requests.exceptions.HTTPError as e:
198
- error_detail = f"Server responded with status {e.response.status_code}."
199
- try:
200
- error_json = e.response.json()
201
- error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
202
- except requests.exceptions.JSONDecodeError:
203
- error_detail += f" Response: {e.response.text[:500]}"
204
- status_message = f"Submission Failed: {error_detail}"
205
- print(status_message)
206
- results_df = pd.DataFrame(results_log)
207
- return status_message, results_df
208
- except requests.exceptions.Timeout:
209
- status_message = "Submission Failed: The request timed out."
210
- print(status_message)
211
- results_df = pd.DataFrame(results_log)
212
- return status_message, results_df
213
- except requests.exceptions.RequestException as e:
214
- status_message = f"Submission Failed: Network error - {e}"
215
- print(status_message)
216
- results_df = pd.DataFrame(results_log)
217
- return status_message, results_df
218
- except Exception as e:
219
- status_message = f"An unexpected error occurred during submission: {e}"
220
- print(status_message)
221
- results_df = pd.DataFrame(results_log)
222
- return status_message, results_df
223
 
224
 
225
  # --- Build Gradio Interface using Blocks ---
 
1
+ import re
2
+ import spacy
 
 
 
 
 
 
3
  from transformers import pipeline
4
+ from duckduckgo_search import DDGS
5
  from sklearn.metrics.pairwise import cosine_similarity
6
  import numpy as np
7
+ import whisper
8
+ import moviepy.editor
9
 
 
 
10
 
 
11
  class BasicAgent:
12
  def __init__(self):
13
  print("BasicAgent initialized.")
14
+ self.whisper_model = whisper.load_model("base")
15
+ self.qa_pipeline = pipeline("question-answering")
16
+ self.ner_pipeline = pipeline("ner", aggregation_strategy="simple")
17
+ self.embedding_model = pipeline("feature-extraction")
18
+ self.spacy = spacy.load("en_core_web_sm")
19
 
20
+ def extract_named_entities(self, text):
21
+ entities = self.ner_pipeline(text)
22
+ return [e["word"] for e in entities if e["entity_group"] == "PER"]
 
23
 
24
+ def extract_numbers(self, text):
25
+ return re.findall(r"\d+", text)
 
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
+ def extract_keywords(self, text):
28
+ doc = self.spacy(text)
29
+ return [token.text for token in doc if token.pos_ in ["NOUN", "PROPN"]]
30
+
31
+ def call_whisper(self, video_path: str) -> str:
32
+ video = moviepy.editor.VideoFileClip(video_path)
33
+ audio_path = "temp_audio.wav"
34
+ video.audio.write_audiofile(audio_path)
35
+ result = self.whisper_model.transcribe(audio_path)
36
+ return result["text"]
 
 
37
 
38
  def search(self, question: str) -> str:
39
  try:
40
  with DDGS() as ddgs:
41
+ results = list(ddgs.text(question, max_results=3))
42
  if not results:
43
  return "No relevant search results found."
44
+ context = results[0]["body"]
45
+ return context
 
 
 
 
 
 
 
46
  except Exception as e:
47
  return f"Search error: {e}"
48
 
49
+ def answer_question(self, question: str, context: str) -> str:
50
+ try:
51
+ return self.qa_pipeline(question=question, context=context)["answer"]
52
+ except:
53
+ return context # Fallback to context if QA fails
 
 
54
 
55
  def __call__(self, question: str, video_path: str = None) -> str:
56
+ print(f"Agent received question: {question[:60]}...")
57
+
58
  if video_path:
59
  transcription = self.call_whisper(video_path)
60
+ print(f"Transcribed video: {transcription[:100]}...")
61
  return transcription
62
 
63
+ context = self.search(question)
64
+ answer = self.answer_question(question, context)
65
+ q_lower = question.lower()
 
 
66
 
67
+ # Enhance based on question type
68
+ if "who" in q_lower:
69
+ people = self.extract_named_entities(context)
70
+ return f"👤 Who: {', '.join(people) if people else 'No person found'}\n\n🧠 Answer: {answer}"
 
 
 
 
 
 
 
 
 
 
 
71
 
72
+ elif "how many" in q_lower:
73
+ numbers = self.extract_numbers(context)
74
+ return f"🔢 How many: {', '.join(numbers) if numbers else 'No numbers found'}\n\n🧠 Answer: {answer}"
75
 
76
+ elif "how" in q_lower:
77
+ return f"⚙️ How: {answer}"
 
 
 
 
 
 
 
78
 
79
+ elif "what" in q_lower or "where" in q_lower:
80
+ keywords = self.extract_keywords(context)
81
+ return f"🗝️ Keywords: {', '.join(keywords[:5])}\n\n🧠 Answer: {answer}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
 
83
+ else:
84
+ return f"🧠 Answer: {answer}"
 
 
 
 
 
 
85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
 
87
 
88
  # --- Build Gradio Interface using Blocks ---