Spaces:
Running
Running
import pandas as pd | |
from collections import defaultdict | |
from difflib import SequenceMatcher | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import json | |
import torch | |
import numpy as np | |
from transformers import AutoTokenizer, AutoModel | |
index = faiss.read_index("iau_reviews_index.faiss") | |
with open("iau_metadata.json", "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
model = SentenceTransformer("HooshvareLab/bert-fa-zwnj-base") | |
tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-fa-zwnj-base") | |
model = AutoModel.from_pretrained("HooshvareLab/bert-fa-zwnj-base").eval() | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
index = faiss.read_index("iau_reviews_index.faiss") | |
with open("iau_metadata.json", "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
def mean_pooling(model_output, attention_mask): | |
token_embeddings = model_output[0] | |
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()) | |
return (token_embeddings * input_mask_expanded).sum(1) / input_mask_expanded.sum(1) | |
def encode_texts(texts, batch_size=16): | |
embeddings = [] | |
with torch.no_grad(): | |
for i in range(0, len(texts), batch_size): | |
batch = texts[i:i+batch_size] | |
encoded_input = tokenizer(batch, padding=True, truncation=True, return_tensors='pt', max_length=128).to(device) | |
model_output = model(**encoded_input) | |
sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask']) | |
sentence_embeddings = sentence_embeddings.cpu().numpy() | |
embeddings.append(sentence_embeddings) | |
return np.vstack(embeddings) | |
def search_reviews(query, top_k=5): | |
keywords = query.strip().split() | |
candidate_rows = [ | |
r for r in metadata | |
if any(kw in r["professor"] or kw in r["course"] for kw in keywords) | |
] | |
if not candidate_rows: | |
return [] | |
texts = [r["course"] + " " + r["professor"] + " " + r["comment"] for r in candidate_rows] | |
vectors = encode_texts(texts) | |
vectors = vectors / np.linalg.norm(vectors, axis=1, keepdims=True) | |
query_vec = encode_texts([query]) | |
query_vec = query_vec / np.linalg.norm(query_vec, axis=1, keepdims=True) | |
local_index = faiss.IndexFlatIP(vectors.shape[1]) | |
local_index.add(vectors) | |
D, I = local_index.search(query_vec, min(top_k, len(candidate_rows))) | |
return [candidate_rows[i] for i in I[0]] | |
def filter_relevant(results, query): | |
query = query.replace("؟", "").strip() | |
query_tokens = set(query.split()) | |
def is_strict_match(row): | |
prof_tokens = set(str(row["professor"]).strip().split()) | |
course_tokens = set(str(row["course"]).strip().split()) | |
match_prof = prof_tokens & query_tokens | |
match_course = course_tokens & query_tokens | |
return bool(match_prof or match_course) | |
return [r for r in results if is_strict_match(r)] | |
def similar(a, b): | |
return SequenceMatcher(None, a, b).ratio() | |
def keyword_match_reviews(query, metadata): | |
query = query.strip().replace("؟", "") | |
keywords = set(query.split()) | |
results = [] | |
for row in metadata: | |
prof = str(row["professor"]) | |
course = str(row["course"]) | |
for k in keywords: | |
if k in prof or k in course or similar(k, prof) > 0.7 or similar(k, course) > 0.7: | |
results.append(row) | |
break | |
return results | |
def relevance_score(row, query): | |
score = 0 | |
if row["professor"] in query: | |
score += 2 | |
if row["course"] in query: | |
score += 2 | |
if row["professor"].split()[0] in query: | |
score += 1 | |
if row["course"].split()[0] in query: | |
score += 1 | |
return score | |
def build_strict_context(reviews, user_question): | |
prof_match_scores = defaultdict(int) | |
course_match_scores = defaultdict(int) | |
for r in reviews: | |
prof_sim = similar(user_question, r["professor"]) | |
course_sim = similar(user_question, r["course"]) | |
if prof_sim > 0.6: | |
prof_match_scores[r["professor"]] += prof_sim | |
if course_sim > 0.6: | |
course_match_scores[r["course"]] += course_sim | |
best_prof = max(prof_match_scores, key=prof_match_scores.get, default="") | |
best_course = max(course_match_scores, key=course_match_scores.get, default="") | |
if best_prof and best_course: | |
filtered = [ | |
r for r in reviews | |
if similar(best_prof, r["professor"]) > 0.85 and similar(best_course, r["course"]) > 0.85 | |
] | |
elif best_course: | |
filtered = [r for r in reviews if similar(best_course, r["course"]) > 0.85] | |
elif best_prof: | |
filtered = [r for r in reviews if similar(best_prof, r["professor"]) > 0.85] | |
else: | |
filtered = reviews | |
result = f"👨🏫 استاد: {best_prof or '[نامشخص]'} — 📚 درس: {best_course or '[نامشخص]'}\n💬 نظرات:\n" | |
for i, r in enumerate(filtered, 1): | |
result += f"{i}. {r['comment'].strip()}\n🔗 لینک: {r['link']}\n\n" | |
return result | |
def truncate_reviews_to_fit(reviews, max_chars=127000): | |
total = 0 | |
final = [] | |
for r in reviews: | |
size = len(r["comment"]) | |
if total + size > max_chars: | |
break | |
final.append(r) | |
total += size | |
return final | |
def answer_question(user_question, gemini_model): | |
print(f"\n🧠 Starting debug for question: {user_question}") | |
retrieved = search_reviews(user_question, top_k=100) | |
print(f"🔍 FAISS returned {len(retrieved)} raw rows") | |
retrieved = filter_relevant(retrieved, user_question) | |
print(f"✅ After filter_relevant(): {len(retrieved)} rows") | |
keyword_hits = keyword_match_reviews(user_question, metadata) | |
print(f"🔠 Keyword hits found: {len(keyword_hits)}") | |
existing_links = set(r["link"] for r in retrieved) | |
added = 0 | |
for r in keyword_hits: | |
if r["link"] not in existing_links: | |
retrieved.append(r) | |
added += 1 | |
print(f"➕ Added {added} unique fallback keyword rows") | |
print(f"📊 Total before truncation: {len(retrieved)}") | |
if not retrieved: | |
return "❌ هیچ تجربهای در مورد سوال شما در دادههای کانال یافت نشد." | |
retrieved.sort(key=lambda r: relevance_score(r, user_question), reverse=True) | |
retrieved = truncate_reviews_to_fit(retrieved) | |
print(f"✂️ After truncation: {len(retrieved)} rows") | |
context = build_strict_context(retrieved, user_question) | |
print("📝 Sample context sent to LLM:\n", context[:100000], "\n...") | |
prompt = f"""شما یک دستیار هوشمند انتخاب واحد هستید که فقط و فقط بر اساس نظرات واقعی دانشجویان از کانال @IAUCourseExp پاسخ میدهید. کار شما کمک به دانشجویان برای انتخاب استاد و درس، بر اساس تجربیات ثبتشده در این کانال است. | |
❗ قوانین مهم: | |
- فقط از دادههای همین نظرات استفاده کن. هیچ اطلاعات اضافی، حدسی یا اینترنتی استفاده نکن. | |
- اگر هیچ نظری درباره سؤال وجود ندارد، فقط بگو: «هیچ تجربهای دربارهٔ این مورد در کانال ثبت نشده است.» | |
- سوالات دانشجو میتوانند از انواع مختلف باشند: | |
• بررسی یک استاد خاص | |
• مقایسه چند استاد برای یک درس | |
• معرفی بهترین یا بدترین استادهای یک درس | |
• تحلیل نظر کلی دانشجویان درمورد یک درس خاص | |
بنابراین آماده باش که با توجه به دادهها به هر نوع سوال، دقیق و قابل اعتماد پاسخ بدهی. | |
- همهی نظرات مربوط به سوال را بررسی کن (نه فقط یکی یا دو تا) و بهصورت فهرستوار یا خلاصهشده تحلیلشان کن. | |
- برای هر نظر، لینک تلگرام مربوطه را نیز حتماً حتماً حتماً حتماً حتماً ذکر کن. | |
- در پایان پاسخ، نتیجهگیری نهایی خود را بنویس: آیا این استاد برای این درس توصیه میشود یا نه — فقط بر اساس همین نظرات. | |
- در انتها حتماً بنویس: | |
📊 این پاسخ بر اساس بررسی {len(retrieved)} نظر دانشجویی نوشته شده است. | |
🔎 سوال دانشجو: | |
{user_question} | |
📄 نظرات دانشجویان (برگرفته از کانال تجربیات انتخاب واحد): | |
{context} | |
📘 پاسخ نهایی: | |
""" | |
response = gemini_model.generate_content(prompt) | |
return response.text | |