23RAG7 / evaluation.py
vamseelatha2002's picture
Update evaluation.py
41619a3 verified
import numpy as np
from sklearn.metrics import mean_squared_error, roc_auc_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from data_processing import load_query_dataset
global ground_truth_answer, ground_truth_metrics,rmse_scores
ground_truth_answer = ''
ground_truth_metrics = {}
rmse_scores = {}
# def calculate_metrics(question, response, docs, time_taken):
# data = load_ragbench()
# retrieve_ground_truths(question, data)
# # Predicted metrics
# predicted_metrics = {
# "ground_truth": ground_truth_answer,
# "context_relevance": context_relevance(question, docs),
# "context_utilization": context_utilization(response, docs),
# "completeness": completeness(response, ground_truth_answer),
# "adherence": adherence(response, docs),
# "response_time" : time_taken
# }
# return predicted_metrics
# def retrieve_ground_truths(question,ragbench_set):
# for dataset_name in ragbench_set.keys():
# for split_name,instances in ragbench_set[dataset_name].items(): # Fixed: Removed extra '.' and corrected indentation
# print(f"Processing {split_name} split")
# for instance in instances: # Fixed: Corrected indentation
# # Check if the question (data) matches the query
# if instance['question'] == question:
# # If a match is found, retrieve id and response
# instance_id = instance['id']
# instance_response = instance['response']
# ground_truth_metrics = {
# "context_relevance": instance['relevance_score'],
# "context_utilization": instance['utilization_score'],
# "completeness": instance['completeness_score'],
# "adherence": instance['adherence_score']
# }
# ground_truth_answer = instance_response
# print(f"Match found in {split_name} split!")
# print(f"ID: {instance_id}, Response: {instance_response}")
# break # Exit after finding the first match (optional)
# Step 1: Helper function to compute cosine similarity
def compute_cosine_similarity(text1, text2):
if not text1 or not text2: # Check for empty or None values
print("Error: One or both input texts are empty. Returning similarity as 0.")
return 0.0
vectorizer = TfidfVectorizer(stop_words="english")
try:
vectors = vectorizer.fit_transform([text1, text2])
similarity = cosine_similarity(vectors[0], vectors[1])[0][0]
return similarity
except ValueError as e:
print(f"Error in vectorization: {e}. Returning similarity as 0.")
return 0.0
# Step 2: Metric 1 - Context Relevance
def context_relevance(question, relevant_documents):
# combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
return compute_cosine_similarity(question, combined_docs)
# Step 3: Metric 2 - Context Utilization
def context_utilization(response, relevant_documents):
#combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
return compute_cosine_similarity(response, combined_docs)
# Step 4: Metric 3 - Completeness
def completeness(response, ground_truth_answer):
return compute_cosine_similarity(response, ground_truth_answer)
# Step 5: Metric 4 - Adherence
def adherence(response, relevant_documents):
#combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
response_tokens = set(response.split())
relevant_tokens = set(combined_docs.split())
supported_tokens = response_tokens.intersection(relevant_tokens)
return len(supported_tokens) / len(response_tokens)
# Step 6: Compute RMSE for metrics
def compute_rmse(predicted_values, ground_truth_values):
# Ensure that both predicted_values and ground_truth_values are numeric
if all(isinstance(i, (int, float)) for i in predicted_values) and all(isinstance(i, (int, float)) for i in ground_truth_values):
return np.sqrt(mean_squared_error(ground_truth_values, predicted_values))
else:
print("Invalid input for RMSE calculation. Ensure all values are numeric.")
return None
def convert_adherence_to_numerical(adherence_score):
if adherence_score:
return 0.8 # True becomes 1
else:
return 0.5 # False becomes 0
def retrieve_ground_truths(question, dataset,time_taken):
"""Retrieve the ground truth answer for a given question from the dataset."""
for split_name, instances in dataset.items():
for instance in instances:
if instance['question'] == question:
instance_response = instance['response']
adherence_numerical = convert_adherence_to_numerical(instance['adherence_score'])
ground_truth_metrics = {
"context_relevance": instance['relevance_score'],
"context_utilization": instance['utilization_score'],
"completeness": instance['completeness_score'],
"adherence": adherence_numerical,
"response_time": time_taken
}
return instance_response, ground_truth_metrics # Return the ground truth response immediately
return None,None # Return None if no match is found
def store_rmse(question, predicted_metrics, ground_truth_metrics):
"""Calculate and store RMSE for each metric."""
for metric_name in predicted_metrics:
predicted_value = predicted_metrics[metric_name]
# Get the corresponding ground truth value from ground_truth_metrics
ground_truth_value = ground_truth_metrics.get(metric_name, None)
# Debugging: Check the values being compared
print(f"Comparing {metric_name}: Predicted = {predicted_value}, Ground Truth = {ground_truth_value}")
# Ensure both predicted value and ground truth value are numeric before calculating RMSE
if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)):
rmse_value = compute_rmse([predicted_value], [ground_truth_value])
if rmse_value is not None:
print(f"RMSE for {metric_name}: {rmse_value}")
if question not in rmse_scores:
rmse_scores[question] = {}
rmse_scores[question][metric_name] = rmse_value
else:
print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric")
def calculate_metrics(question, q_dataset, response, docs, time_taken):
data = load_query_dataset(q_dataset)
ground_truth_answer, ground_truth_metrics = retrieve_ground_truths(question, data,time_taken) # Store the ground truth answer
# Ensure ground_truth_answer is not empty before proceeding
if ground_truth_answer is None:
ground_truth_answer = "" # Default to an empty string if no ground truth is found
# Convert ground truth to numeric form (e.g., using cosine similarity or some metric)
# Here, let's assume completeness is based on cosine similarity between the response and the ground truth
# ground_truth_completeness = compute_cosine_similarity(response, ground_truth_answer)
# Predicted metrics
# Predicted metrics
predicted_metrics_rmse = {
"context_relevance": context_relevance(question, docs),
"context_utilization": context_utilization(response, docs),
"completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer),
"adherence": adherence(response, docs),
"response_time": time_taken
}
store_rmse(question, predicted_metrics_rmse, ground_truth_metrics)
# Now, make sure the values passed to RMSE calculation are numeric
#predicted_completeness = predicted_metrics['completeness']
# Ensure both predicted_completeness and ground_truth_completeness are numeric before calculating RMSE
'''
if isinstance(predicted_completeness, (int, float)) and isinstance(ground_truth_completeness, (int, float)):
rmse_value = compute_rmse([predicted_completeness], [ground_truth_completeness])
predicted_metrics["rmse"] = rmse_value # Adding RMSE to metrics
else:
predicted_metrics["rmse"] = "Invalid RMSE calculation"
'''
for metric_name in predicted_metrics_rmse:
predicted_value = predicted_metrics_rmse[metric_name]
print(f"RMSE for {metric_name}: {predicted_value}")
for metric_name in ground_truth_metrics:
ground_truth_value = ground_truth_metrics[metric_name]
print(f"RMSE for {metric_name}: {ground_truth_value}")
rmse_values = []
ground_truth_values = []
for metric_name in predicted_metrics_rmse:
predicted_value = predicted_metrics_rmse[metric_name]
ground_truth_value = ground_truth_metrics.get(metric_name, None)
# Ensure both predicted and ground truth values are numeric
if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)):
rmse_values.append(predicted_value)
ground_truth_values.append(ground_truth_value)
else:
print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric")
if rmse_values and ground_truth_values:
overall_rmse = compute_rmse(rmse_values, ground_truth_values)
print(f"Overall RMSE: {overall_rmse}")
else:
print("Invalid RMSE calculation due to non-numeric values.")
predicted_metrics = {
"RAG_model_response": response,
"ground_truth": ground_truth_answer,
"context_relevance": context_relevance(question, docs),
"context_utilization": context_utilization(response, docs),
"completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer),
"adherence": adherence(response, docs),
"response_time": time_taken,
"rmse": overall_rmse
}
'''
if isinstance(predicted_metrics_rmse, (int, float)) and isinstance(ground_truth_metrics, (int, float)):
rmse_value = compute_rmse(predicted_metrics_rmse.values(), ground_truth_metrics.values())
predicted_metrics_rmse["rmse"] = rmse_value # Adding RMSE to metrics
else:
predicted_metrics_rmse["rmse"] = "Invalid RMSE calculation"
'''
return predicted_metrics
''' def retrieve_ground_truths(question, dataset):
for split_name, instances in dataset.items():
print(f"Processing {split_name} split")
for instance in instances:
if instance['question'] == question:
instance_id = instance['id']
instance_response = instance['response']
# ground_truth_metrics = {
# "context_relevance": instance['relevance_score'],
# "context_utilization": instance['utilization_score'],
# "completeness": instance['completeness_score'],
# "adherence": instance['adherence_score']
# }
print(f"Match found in {split_name} split!")
print(f"ID: {instance_id}, Response: {instance_response}")
return instance_response # Return ground truth response immediately
return None # Return None if no match is found
'''