from sklearn.metrics.pairwise import cosine_similarity import numpy as np import pandas as pd import json import requests from tqdm import tqdm import csv import logging import sys from transformers import pipeline # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout ) logger = logging.getLogger(__name__) class EvaluationSystem: def __init__(self, data_processor, database_handler): self.data_processor = data_processor self.db_handler = database_handler # Initialize the model self.model = pipeline( "text-generation", model="google/flan-t5-base", device=-1 # Use CPU ) logger.info("Initialized evaluation system with flan-t5-base model") def generate_llm_response(self, prompt): """Generate response using Hugging Face model""" try: response = self.model( prompt, max_length=512, min_length=64, num_return_sequences=1 )[0]['generated_text'] return response except Exception as e: logger.error(f"Error generating response: {str(e)}") return None def relevance_scoring(self, query, retrieved_docs, top_k=5): query_embedding = self.data_processor.embedding_model.encode(query) doc_embeddings = [self.data_processor.embedding_model.encode(doc['content']) for doc in retrieved_docs] similarities = cosine_similarity([query_embedding], doc_embeddings)[0] return np.mean(sorted(similarities, reverse=True)[:top_k]) def answer_similarity(self, generated_answer, reference_answer): gen_embedding = self.data_processor.embedding_model.encode(generated_answer) ref_embedding = self.data_processor.embedding_model.encode(reference_answer) return cosine_similarity([gen_embedding], [ref_embedding])[0][0] def human_evaluation(self, video_id, query): with self.db_handler.conn: cursor = self.db_handler.conn.cursor() cursor.execute(''' SELECT AVG(feedback) FROM user_feedback WHERE video_id = ? AND query = ? ''', (video_id, query)) result = cursor.fetchone() return result[0] if result[0] is not None else 0 def llm_as_judge(self, question, generated_answer, prompt_template): prompt = prompt_template.format( question=question, answer_llm=generated_answer ) try: response = self.generate_llm_response(prompt) if response: # Try to parse JSON response try: evaluation = json.loads(response) return evaluation except json.JSONDecodeError: logger.error("Failed to parse LLM response as JSON") return None return None except Exception as e: logger.error(f"Error in LLM evaluation: {str(e)}") return None def evaluate_rag(self, rag_system, ground_truth_file, prompt_template=None): try: ground_truth = pd.read_csv(ground_truth_file) except FileNotFoundError: logger.error("Ground truth file not found. Please generate ground truth data first.") return None evaluations = [] for _, row in tqdm(ground_truth.iterrows(), total=len(ground_truth)): question = row['question'] video_id = row['video_id'] index_name = self.db_handler.get_elasticsearch_index_by_youtube_id(video_id) if not index_name: logger.warning(f"No index found for video {video_id}. Skipping this question.") continue try: answer_llm, _ = rag_system.query(question, search_method='hybrid', index_name=index_name) except ValueError as e: logger.error(f"Error querying RAG system: {str(e)}") continue if prompt_template: evaluation = self.llm_as_judge(question, answer_llm, prompt_template) if evaluation: evaluations.append({ 'video_id': str(video_id), 'question': str(question), 'answer': str(answer_llm), 'relevance': str(evaluation.get('Relevance', 'UNKNOWN')), 'explanation': str(evaluation.get('Explanation', 'No explanation provided')) }) else: similarity = self.answer_similarity(answer_llm, row.get('reference_answer', '')) evaluations.append({ 'video_id': str(video_id), 'question': str(question), 'answer': str(answer_llm), 'relevance': f"Similarity: {similarity}", 'explanation': "Cosine similarity used for evaluation" }) # Save evaluations to CSV if evaluations: csv_path = 'data/evaluation_results.csv' with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = ['video_id', 'question', 'answer', 'relevance', 'explanation'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for eval_data in evaluations: writer.writerow(eval_data) logger.info(f"Evaluation results saved to {csv_path}") # Save evaluations to database self.save_evaluations_to_db(evaluations) return evaluations def save_evaluations_to_db(self, evaluations): for eval_data in evaluations: self.db_handler.save_rag_evaluation(eval_data) logger.info("Evaluation results saved to database") def hit_rate(self, relevance_total): return sum(any(line) for line in relevance_total) / len(relevance_total) def mrr(self, relevance_total): scores = [] for line in relevance_total: for rank, relevant in enumerate(line, 1): if relevant: scores.append(1 / rank) break else: scores.append(0) return sum(scores) / len(scores) def simple_optimize(self, param_ranges, objective_function, n_iterations=10): best_params = None best_score = float('-inf') for _ in range(n_iterations): current_params = {param: np.random.uniform(min_val, max_val) for param, (min_val, max_val) in param_ranges.items()} current_score = objective_function(current_params) if current_score > best_score: best_score = current_score best_params = current_params return best_params, best_score def evaluate_search(self, ground_truth, search_function): relevance_total = [] for _, row in tqdm(ground_truth.iterrows(), total=len(ground_truth)): video_id = row['video_id'] results = search_function(row['question'], video_id) relevance = [d['video_id'] == video_id for d in results] relevance_total.append(relevance) return { 'hit_rate': self.hit_rate(relevance_total), 'mrr': self.mrr(relevance_total), } def run_full_evaluation(self, rag_system, ground_truth_file, prompt_template=None): # Load ground truth ground_truth = pd.read_csv(ground_truth_file) # Evaluate RAG rag_evaluations = self.evaluate_rag(rag_system, ground_truth_file, prompt_template) # Evaluate search performance def search_function(query, video_id): index_name = self.db_handler.get_elasticsearch_index_by_youtube_id(video_id) if index_name: return rag_system.data_processor.search(query, num_results=10, method='hybrid', index_name=index_name) return [] search_performance = self.evaluate_search(ground_truth, search_function) # Optimize search parameters param_ranges = {'content': (0.0, 3.0)} # Example parameter range def objective_function(params): def parameterized_search(query, video_id): index_name = self.db_handler.get_elasticsearch_index_by_youtube_id(video_id) if index_name: return rag_system.data_processor.search( query, num_results=10, method='hybrid', index_name=index_name, boost_dict=params ) return [] return self.evaluate_search(ground_truth, parameterized_search)['mrr'] best_params, best_score = self.simple_optimize(param_ranges, objective_function) return { "rag_evaluations": rag_evaluations, "search_performance": search_performance, "best_params": best_params, "best_score": best_score }