Spaces:
Sleeping
Sleeping
import numpy as np | |
from sklearn.metrics import mean_squared_error, roc_auc_score | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from data_processing import load_query_dataset | |
global ground_truth_answer, ground_truth_metrics,rmse_scores | |
ground_truth_answer = '' | |
ground_truth_metrics = {} | |
rmse_scores = {} | |
# def calculate_metrics(question, response, docs, time_taken): | |
# data = load_ragbench() | |
# retrieve_ground_truths(question, data) | |
# # Predicted metrics | |
# predicted_metrics = { | |
# "ground_truth": ground_truth_answer, | |
# "context_relevance": context_relevance(question, docs), | |
# "context_utilization": context_utilization(response, docs), | |
# "completeness": completeness(response, ground_truth_answer), | |
# "adherence": adherence(response, docs), | |
# "response_time" : time_taken | |
# } | |
# return predicted_metrics | |
# def retrieve_ground_truths(question,ragbench_set): | |
# for dataset_name in ragbench_set.keys(): | |
# for split_name,instances in ragbench_set[dataset_name].items(): # Fixed: Removed extra '.' and corrected indentation | |
# print(f"Processing {split_name} split") | |
# for instance in instances: # Fixed: Corrected indentation | |
# # Check if the question (data) matches the query | |
# if instance['question'] == question: | |
# # If a match is found, retrieve id and response | |
# instance_id = instance['id'] | |
# instance_response = instance['response'] | |
# ground_truth_metrics = { | |
# "context_relevance": instance['relevance_score'], | |
# "context_utilization": instance['utilization_score'], | |
# "completeness": instance['completeness_score'], | |
# "adherence": instance['adherence_score'] | |
# } | |
# ground_truth_answer = instance_response | |
# print(f"Match found in {split_name} split!") | |
# print(f"ID: {instance_id}, Response: {instance_response}") | |
# break # Exit after finding the first match (optional) | |
# Step 1: Helper function to compute cosine similarity | |
def compute_cosine_similarity(text1, text2): | |
if not text1 or not text2: # Check for empty or None values | |
print("Error: One or both input texts are empty. Returning similarity as 0.") | |
return 0.0 | |
vectorizer = TfidfVectorizer(stop_words="english") | |
try: | |
vectors = vectorizer.fit_transform([text1, text2]) | |
similarity = cosine_similarity(vectors[0], vectors[1])[0][0] | |
return similarity | |
except ValueError as e: | |
print(f"Error in vectorization: {e}. Returning similarity as 0.") | |
return 0.0 | |
# Step 2: Metric 1 - Context Relevance | |
def context_relevance(question, relevant_documents): | |
# combined_docs = " ".join([doc.page_content for doc in relevant_documents]) | |
combined_docs = " ".join([doc for doc in relevant_documents]) | |
return compute_cosine_similarity(question, combined_docs) | |
# Step 3: Metric 2 - Context Utilization | |
def context_utilization(response, relevant_documents): | |
#combined_docs = " ".join([doc.page_content for doc in relevant_documents]) | |
combined_docs = " ".join([doc for doc in relevant_documents]) | |
return compute_cosine_similarity(response, combined_docs) | |
# Step 4: Metric 3 - Completeness | |
def completeness(response, ground_truth_answer): | |
return compute_cosine_similarity(response, ground_truth_answer) | |
# Step 5: Metric 4 - Adherence | |
def adherence(response, relevant_documents): | |
#combined_docs = " ".join([doc.page_content for doc in relevant_documents]) | |
combined_docs = " ".join([doc for doc in relevant_documents]) | |
response_tokens = set(response.split()) | |
relevant_tokens = set(combined_docs.split()) | |
supported_tokens = response_tokens.intersection(relevant_tokens) | |
return len(supported_tokens) / len(response_tokens) | |
# Step 6: Compute RMSE for metrics | |
def compute_rmse(predicted_values, ground_truth_values): | |
# Ensure that both predicted_values and ground_truth_values are numeric | |
if all(isinstance(i, (int, float)) for i in predicted_values) and all(isinstance(i, (int, float)) for i in ground_truth_values): | |
return np.sqrt(mean_squared_error(ground_truth_values, predicted_values)) | |
else: | |
print("Invalid input for RMSE calculation. Ensure all values are numeric.") | |
return None | |
def convert_adherence_to_numerical(adherence_score): | |
if adherence_score: | |
return 0.8 # True becomes 1 | |
else: | |
return 0.5 # False becomes 0 | |
def retrieve_ground_truths(question, dataset,time_taken): | |
"""Retrieve the ground truth answer for a given question from the dataset.""" | |
for split_name, instances in dataset.items(): | |
for instance in instances: | |
if instance['question'] == question: | |
instance_response = instance['response'] | |
adherence_numerical = convert_adherence_to_numerical(instance['adherence_score']) | |
ground_truth_metrics = { | |
"context_relevance": instance['relevance_score'], | |
"context_utilization": instance['utilization_score'], | |
"completeness": instance['completeness_score'], | |
"adherence": adherence_numerical, | |
"response_time": time_taken | |
} | |
return instance_response, ground_truth_metrics # Return the ground truth response immediately | |
return None,None # Return None if no match is found | |
def store_rmse(question, predicted_metrics, ground_truth_metrics): | |
"""Calculate and store RMSE for each metric.""" | |
for metric_name in predicted_metrics: | |
predicted_value = predicted_metrics[metric_name] | |
# Get the corresponding ground truth value from ground_truth_metrics | |
ground_truth_value = ground_truth_metrics.get(metric_name, None) | |
# Debugging: Check the values being compared | |
print(f"Comparing {metric_name}: Predicted = {predicted_value}, Ground Truth = {ground_truth_value}") | |
# Ensure both predicted value and ground truth value are numeric before calculating RMSE | |
if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)): | |
rmse_value = compute_rmse([predicted_value], [ground_truth_value]) | |
if rmse_value is not None: | |
print(f"RMSE for {metric_name}: {rmse_value}") | |
if question not in rmse_scores: | |
rmse_scores[question] = {} | |
rmse_scores[question][metric_name] = rmse_value | |
else: | |
print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric") | |
def calculate_metrics(question, q_dataset, response, docs, time_taken): | |
data = load_query_dataset(q_dataset) | |
ground_truth_answer, ground_truth_metrics = retrieve_ground_truths(question, data,time_taken) # Store the ground truth answer | |
# Ensure ground_truth_answer is not empty before proceeding | |
if ground_truth_answer is None: | |
ground_truth_answer = "" # Default to an empty string if no ground truth is found | |
# Convert ground truth to numeric form (e.g., using cosine similarity or some metric) | |
# Here, let's assume completeness is based on cosine similarity between the response and the ground truth | |
# ground_truth_completeness = compute_cosine_similarity(response, ground_truth_answer) | |
# Predicted metrics | |
# Predicted metrics | |
predicted_metrics_rmse = { | |
"context_relevance": context_relevance(question, docs), | |
"context_utilization": context_utilization(response, docs), | |
"completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer), | |
"adherence": adherence(response, docs), | |
"response_time": time_taken | |
} | |
store_rmse(question, predicted_metrics_rmse, ground_truth_metrics) | |
# Now, make sure the values passed to RMSE calculation are numeric | |
#predicted_completeness = predicted_metrics['completeness'] | |
# Ensure both predicted_completeness and ground_truth_completeness are numeric before calculating RMSE | |
''' | |
if isinstance(predicted_completeness, (int, float)) and isinstance(ground_truth_completeness, (int, float)): | |
rmse_value = compute_rmse([predicted_completeness], [ground_truth_completeness]) | |
predicted_metrics["rmse"] = rmse_value # Adding RMSE to metrics | |
else: | |
predicted_metrics["rmse"] = "Invalid RMSE calculation" | |
''' | |
for metric_name in predicted_metrics_rmse: | |
predicted_value = predicted_metrics_rmse[metric_name] | |
print(f"RMSE for {metric_name}: {predicted_value}") | |
for metric_name in ground_truth_metrics: | |
ground_truth_value = ground_truth_metrics[metric_name] | |
print(f"RMSE for {metric_name}: {ground_truth_value}") | |
rmse_values = [] | |
ground_truth_values = [] | |
for metric_name in predicted_metrics_rmse: | |
predicted_value = predicted_metrics_rmse[metric_name] | |
ground_truth_value = ground_truth_metrics.get(metric_name, None) | |
# Ensure both predicted and ground truth values are numeric | |
if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)): | |
rmse_values.append(predicted_value) | |
ground_truth_values.append(ground_truth_value) | |
else: | |
print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric") | |
if rmse_values and ground_truth_values: | |
overall_rmse = compute_rmse(rmse_values, ground_truth_values) | |
print(f"Overall RMSE: {overall_rmse}") | |
else: | |
print("Invalid RMSE calculation due to non-numeric values.") | |
predicted_metrics = { | |
"RAG_model_response": response, | |
"ground_truth": ground_truth_answer, | |
"context_relevance": context_relevance(question, docs), | |
"context_utilization": context_utilization(response, docs), | |
"completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer), | |
"adherence": adherence(response, docs), | |
"response_time": time_taken, | |
"rmse": overall_rmse | |
} | |
''' | |
if isinstance(predicted_metrics_rmse, (int, float)) and isinstance(ground_truth_metrics, (int, float)): | |
rmse_value = compute_rmse(predicted_metrics_rmse.values(), ground_truth_metrics.values()) | |
predicted_metrics_rmse["rmse"] = rmse_value # Adding RMSE to metrics | |
else: | |
predicted_metrics_rmse["rmse"] = "Invalid RMSE calculation" | |
''' | |
return predicted_metrics | |
''' def retrieve_ground_truths(question, dataset): | |
for split_name, instances in dataset.items(): | |
print(f"Processing {split_name} split") | |
for instance in instances: | |
if instance['question'] == question: | |
instance_id = instance['id'] | |
instance_response = instance['response'] | |
# ground_truth_metrics = { | |
# "context_relevance": instance['relevance_score'], | |
# "context_utilization": instance['utilization_score'], | |
# "completeness": instance['completeness_score'], | |
# "adherence": instance['adherence_score'] | |
# } | |
print(f"Match found in {split_name} split!") | |
print(f"ID: {instance_id}, Response: {instance_response}") | |
return instance_response # Return ground truth response immediately | |
return None # Return None if no match is found | |
''' | |