import numpy as np from sklearn.metrics import mean_squared_error, roc_auc_score from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from data_processing import load_query_dataset global ground_truth_answer, ground_truth_metrics,rmse_scores ground_truth_answer = '' ground_truth_metrics = {} rmse_scores = {} # def calculate_metrics(question, response, docs, time_taken): # data = load_ragbench() # retrieve_ground_truths(question, data) # # Predicted metrics # predicted_metrics = { # "ground_truth": ground_truth_answer, # "context_relevance": context_relevance(question, docs), # "context_utilization": context_utilization(response, docs), # "completeness": completeness(response, ground_truth_answer), # "adherence": adherence(response, docs), # "response_time" : time_taken # } # return predicted_metrics # def retrieve_ground_truths(question,ragbench_set): # for dataset_name in ragbench_set.keys(): # for split_name,instances in ragbench_set[dataset_name].items(): # Fixed: Removed extra '.' and corrected indentation # print(f"Processing {split_name} split") # for instance in instances: # Fixed: Corrected indentation # # Check if the question (data) matches the query # if instance['question'] == question: # # If a match is found, retrieve id and response # instance_id = instance['id'] # instance_response = instance['response'] # ground_truth_metrics = { # "context_relevance": instance['relevance_score'], # "context_utilization": instance['utilization_score'], # "completeness": instance['completeness_score'], # "adherence": instance['adherence_score'] # } # ground_truth_answer = instance_response # print(f"Match found in {split_name} split!") # print(f"ID: {instance_id}, Response: {instance_response}") # break # Exit after finding the first match (optional) # Step 1: Helper function to compute cosine similarity def compute_cosine_similarity(text1, text2): if not text1 or not text2: # Check for empty or None values print("Error: One or both input texts are empty. Returning similarity as 0.") return 0.0 vectorizer = TfidfVectorizer(stop_words="english") try: vectors = vectorizer.fit_transform([text1, text2]) similarity = cosine_similarity(vectors[0], vectors[1])[0][0] return similarity except ValueError as e: print(f"Error in vectorization: {e}. Returning similarity as 0.") return 0.0 # Step 2: Metric 1 - Context Relevance def context_relevance(question, relevant_documents): # combined_docs = " ".join([doc.page_content for doc in relevant_documents]) combined_docs = " ".join([doc for doc in relevant_documents]) return compute_cosine_similarity(question, combined_docs) # Step 3: Metric 2 - Context Utilization def context_utilization(response, relevant_documents): #combined_docs = " ".join([doc.page_content for doc in relevant_documents]) combined_docs = " ".join([doc for doc in relevant_documents]) return compute_cosine_similarity(response, combined_docs) # Step 4: Metric 3 - Completeness def completeness(response, ground_truth_answer): return compute_cosine_similarity(response, ground_truth_answer) # Step 5: Metric 4 - Adherence def adherence(response, relevant_documents): #combined_docs = " ".join([doc.page_content for doc in relevant_documents]) combined_docs = " ".join([doc for doc in relevant_documents]) response_tokens = set(response.split()) relevant_tokens = set(combined_docs.split()) supported_tokens = response_tokens.intersection(relevant_tokens) return len(supported_tokens) / len(response_tokens) # Step 6: Compute RMSE for metrics def compute_rmse(predicted_values, ground_truth_values): # Ensure that both predicted_values and ground_truth_values are numeric if all(isinstance(i, (int, float)) for i in predicted_values) and all(isinstance(i, (int, float)) for i in ground_truth_values): return np.sqrt(mean_squared_error(ground_truth_values, predicted_values)) else: print("Invalid input for RMSE calculation. Ensure all values are numeric.") return None def convert_adherence_to_numerical(adherence_score): if adherence_score: return 0.8 # True becomes 1 else: return 0.5 # False becomes 0 def retrieve_ground_truths(question, dataset,time_taken): """Retrieve the ground truth answer for a given question from the dataset.""" for split_name, instances in dataset.items(): for instance in instances: if instance['question'] == question: instance_response = instance['response'] adherence_numerical = convert_adherence_to_numerical(instance['adherence_score']) ground_truth_metrics = { "context_relevance": instance['relevance_score'], "context_utilization": instance['utilization_score'], "completeness": instance['completeness_score'], "adherence": adherence_numerical, "response_time": time_taken } return instance_response, ground_truth_metrics # Return the ground truth response immediately return None,None # Return None if no match is found def store_rmse(question, predicted_metrics, ground_truth_metrics): """Calculate and store RMSE for each metric.""" for metric_name in predicted_metrics: predicted_value = predicted_metrics[metric_name] # Get the corresponding ground truth value from ground_truth_metrics ground_truth_value = ground_truth_metrics.get(metric_name, None) # Debugging: Check the values being compared print(f"Comparing {metric_name}: Predicted = {predicted_value}, Ground Truth = {ground_truth_value}") # Ensure both predicted value and ground truth value are numeric before calculating RMSE if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)): rmse_value = compute_rmse([predicted_value], [ground_truth_value]) if rmse_value is not None: print(f"RMSE for {metric_name}: {rmse_value}") if question not in rmse_scores: rmse_scores[question] = {} rmse_scores[question][metric_name] = rmse_value else: print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric") def calculate_metrics(question, q_dataset, response, docs, time_taken): data = load_query_dataset(q_dataset) ground_truth_answer, ground_truth_metrics = retrieve_ground_truths(question, data,time_taken) # Store the ground truth answer # Ensure ground_truth_answer is not empty before proceeding if ground_truth_answer is None: ground_truth_answer = "" # Default to an empty string if no ground truth is found # Convert ground truth to numeric form (e.g., using cosine similarity or some metric) # Here, let's assume completeness is based on cosine similarity between the response and the ground truth # ground_truth_completeness = compute_cosine_similarity(response, ground_truth_answer) # Predicted metrics # Predicted metrics predicted_metrics_rmse = { "context_relevance": context_relevance(question, docs), "context_utilization": context_utilization(response, docs), "completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer), "adherence": adherence(response, docs), "response_time": time_taken } store_rmse(question, predicted_metrics_rmse, ground_truth_metrics) # Now, make sure the values passed to RMSE calculation are numeric #predicted_completeness = predicted_metrics['completeness'] # Ensure both predicted_completeness and ground_truth_completeness are numeric before calculating RMSE ''' if isinstance(predicted_completeness, (int, float)) and isinstance(ground_truth_completeness, (int, float)): rmse_value = compute_rmse([predicted_completeness], [ground_truth_completeness]) predicted_metrics["rmse"] = rmse_value # Adding RMSE to metrics else: predicted_metrics["rmse"] = "Invalid RMSE calculation" ''' for metric_name in predicted_metrics_rmse: predicted_value = predicted_metrics_rmse[metric_name] print(f"RMSE for {metric_name}: {predicted_value}") for metric_name in ground_truth_metrics: ground_truth_value = ground_truth_metrics[metric_name] print(f"RMSE for {metric_name}: {ground_truth_value}") rmse_values = [] ground_truth_values = [] for metric_name in predicted_metrics_rmse: predicted_value = predicted_metrics_rmse[metric_name] ground_truth_value = ground_truth_metrics.get(metric_name, None) # Ensure both predicted and ground truth values are numeric if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)): rmse_values.append(predicted_value) ground_truth_values.append(ground_truth_value) else: print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric") if rmse_values and ground_truth_values: overall_rmse = compute_rmse(rmse_values, ground_truth_values) print(f"Overall RMSE: {overall_rmse}") else: print("Invalid RMSE calculation due to non-numeric values.") predicted_metrics = { "RAG_model_response": response, "ground_truth": ground_truth_answer, "context_relevance": context_relevance(question, docs), "context_utilization": context_utilization(response, docs), "completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer), "adherence": adherence(response, docs), "response_time": time_taken, "rmse": overall_rmse } ''' if isinstance(predicted_metrics_rmse, (int, float)) and isinstance(ground_truth_metrics, (int, float)): rmse_value = compute_rmse(predicted_metrics_rmse.values(), ground_truth_metrics.values()) predicted_metrics_rmse["rmse"] = rmse_value # Adding RMSE to metrics else: predicted_metrics_rmse["rmse"] = "Invalid RMSE calculation" ''' return predicted_metrics ''' def retrieve_ground_truths(question, dataset): for split_name, instances in dataset.items(): print(f"Processing {split_name} split") for instance in instances: if instance['question'] == question: instance_id = instance['id'] instance_response = instance['response'] # ground_truth_metrics = { # "context_relevance": instance['relevance_score'], # "context_utilization": instance['utilization_score'], # "completeness": instance['completeness_score'], # "adherence": instance['adherence_score'] # } print(f"Match found in {split_name} split!") print(f"ID: {instance_id}, Response: {instance_response}") return instance_response # Return ground truth response immediately return None # Return None if no match is found '''