Spaces:
Sleeping
Sleeping
File size: 11,865 Bytes
5f5fc92 95df31f 5f5fc92 a19974c 5f5fc92 2a67170 b7ce82a 41619a3 b7ce82a 41619a3 b7ce82a 3d12d57 5f5fc92 9f08148 b7ce82a 9f08148 03816fe 9f08148 06e5962 5f5fc92 09d15e8 fddf108 09d15e8 fddf108 09d15e8 fddf108 09d15e8 fddf108 09d15e8 fddf108 09d15e8 5f5fc92 3d12d57 5f5fc92 cc5bac0 1dac252 cc5bac0 5f5fc92 3dcc1e9 9f08148 09d15e8 03816fe 09d15e8 9f08148 37e503a cc5bac0 06e5962 cc5bac0 fddf108 83e8be9 6b3b226 54dd74b 3dcc1e9 54dd74b 37e503a 5f5fc92 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
import numpy as np
from sklearn.metrics import mean_squared_error, roc_auc_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from data_processing import load_query_dataset
global ground_truth_answer, ground_truth_metrics,rmse_scores
ground_truth_answer = ''
ground_truth_metrics = {}
rmse_scores = {}
# def calculate_metrics(question, response, docs, time_taken):
# data = load_ragbench()
# retrieve_ground_truths(question, data)
# # Predicted metrics
# predicted_metrics = {
# "ground_truth": ground_truth_answer,
# "context_relevance": context_relevance(question, docs),
# "context_utilization": context_utilization(response, docs),
# "completeness": completeness(response, ground_truth_answer),
# "adherence": adherence(response, docs),
# "response_time" : time_taken
# }
# return predicted_metrics
# def retrieve_ground_truths(question,ragbench_set):
# for dataset_name in ragbench_set.keys():
# for split_name,instances in ragbench_set[dataset_name].items(): # Fixed: Removed extra '.' and corrected indentation
# print(f"Processing {split_name} split")
# for instance in instances: # Fixed: Corrected indentation
# # Check if the question (data) matches the query
# if instance['question'] == question:
# # If a match is found, retrieve id and response
# instance_id = instance['id']
# instance_response = instance['response']
# ground_truth_metrics = {
# "context_relevance": instance['relevance_score'],
# "context_utilization": instance['utilization_score'],
# "completeness": instance['completeness_score'],
# "adherence": instance['adherence_score']
# }
# ground_truth_answer = instance_response
# print(f"Match found in {split_name} split!")
# print(f"ID: {instance_id}, Response: {instance_response}")
# break # Exit after finding the first match (optional)
# Step 1: Helper function to compute cosine similarity
def compute_cosine_similarity(text1, text2):
if not text1 or not text2: # Check for empty or None values
print("Error: One or both input texts are empty. Returning similarity as 0.")
return 0.0
vectorizer = TfidfVectorizer(stop_words="english")
try:
vectors = vectorizer.fit_transform([text1, text2])
similarity = cosine_similarity(vectors[0], vectors[1])[0][0]
return similarity
except ValueError as e:
print(f"Error in vectorization: {e}. Returning similarity as 0.")
return 0.0
# Step 2: Metric 1 - Context Relevance
def context_relevance(question, relevant_documents):
# combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
return compute_cosine_similarity(question, combined_docs)
# Step 3: Metric 2 - Context Utilization
def context_utilization(response, relevant_documents):
#combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
return compute_cosine_similarity(response, combined_docs)
# Step 4: Metric 3 - Completeness
def completeness(response, ground_truth_answer):
return compute_cosine_similarity(response, ground_truth_answer)
# Step 5: Metric 4 - Adherence
def adherence(response, relevant_documents):
#combined_docs = " ".join([doc.page_content for doc in relevant_documents])
combined_docs = " ".join([doc for doc in relevant_documents])
response_tokens = set(response.split())
relevant_tokens = set(combined_docs.split())
supported_tokens = response_tokens.intersection(relevant_tokens)
return len(supported_tokens) / len(response_tokens)
# Step 6: Compute RMSE for metrics
def compute_rmse(predicted_values, ground_truth_values):
# Ensure that both predicted_values and ground_truth_values are numeric
if all(isinstance(i, (int, float)) for i in predicted_values) and all(isinstance(i, (int, float)) for i in ground_truth_values):
return np.sqrt(mean_squared_error(ground_truth_values, predicted_values))
else:
print("Invalid input for RMSE calculation. Ensure all values are numeric.")
return None
def convert_adherence_to_numerical(adherence_score):
if adherence_score:
return 0.8 # True becomes 1
else:
return 0.5 # False becomes 0
def retrieve_ground_truths(question, dataset,time_taken):
"""Retrieve the ground truth answer for a given question from the dataset."""
for split_name, instances in dataset.items():
for instance in instances:
if instance['question'] == question:
instance_response = instance['response']
adherence_numerical = convert_adherence_to_numerical(instance['adherence_score'])
ground_truth_metrics = {
"context_relevance": instance['relevance_score'],
"context_utilization": instance['utilization_score'],
"completeness": instance['completeness_score'],
"adherence": adherence_numerical,
"response_time": time_taken
}
return instance_response, ground_truth_metrics # Return the ground truth response immediately
return None,None # Return None if no match is found
def store_rmse(question, predicted_metrics, ground_truth_metrics):
"""Calculate and store RMSE for each metric."""
for metric_name in predicted_metrics:
predicted_value = predicted_metrics[metric_name]
# Get the corresponding ground truth value from ground_truth_metrics
ground_truth_value = ground_truth_metrics.get(metric_name, None)
# Debugging: Check the values being compared
print(f"Comparing {metric_name}: Predicted = {predicted_value}, Ground Truth = {ground_truth_value}")
# Ensure both predicted value and ground truth value are numeric before calculating RMSE
if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)):
rmse_value = compute_rmse([predicted_value], [ground_truth_value])
if rmse_value is not None:
print(f"RMSE for {metric_name}: {rmse_value}")
if question not in rmse_scores:
rmse_scores[question] = {}
rmse_scores[question][metric_name] = rmse_value
else:
print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric")
def calculate_metrics(question, q_dataset, response, docs, time_taken):
data = load_query_dataset(q_dataset)
ground_truth_answer, ground_truth_metrics = retrieve_ground_truths(question, data,time_taken) # Store the ground truth answer
# Ensure ground_truth_answer is not empty before proceeding
if ground_truth_answer is None:
ground_truth_answer = "" # Default to an empty string if no ground truth is found
# Convert ground truth to numeric form (e.g., using cosine similarity or some metric)
# Here, let's assume completeness is based on cosine similarity between the response and the ground truth
# ground_truth_completeness = compute_cosine_similarity(response, ground_truth_answer)
# Predicted metrics
# Predicted metrics
predicted_metrics_rmse = {
"context_relevance": context_relevance(question, docs),
"context_utilization": context_utilization(response, docs),
"completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer),
"adherence": adherence(response, docs),
"response_time": time_taken
}
store_rmse(question, predicted_metrics_rmse, ground_truth_metrics)
# Now, make sure the values passed to RMSE calculation are numeric
#predicted_completeness = predicted_metrics['completeness']
# Ensure both predicted_completeness and ground_truth_completeness are numeric before calculating RMSE
'''
if isinstance(predicted_completeness, (int, float)) and isinstance(ground_truth_completeness, (int, float)):
rmse_value = compute_rmse([predicted_completeness], [ground_truth_completeness])
predicted_metrics["rmse"] = rmse_value # Adding RMSE to metrics
else:
predicted_metrics["rmse"] = "Invalid RMSE calculation"
'''
for metric_name in predicted_metrics_rmse:
predicted_value = predicted_metrics_rmse[metric_name]
print(f"RMSE for {metric_name}: {predicted_value}")
for metric_name in ground_truth_metrics:
ground_truth_value = ground_truth_metrics[metric_name]
print(f"RMSE for {metric_name}: {ground_truth_value}")
rmse_values = []
ground_truth_values = []
for metric_name in predicted_metrics_rmse:
predicted_value = predicted_metrics_rmse[metric_name]
ground_truth_value = ground_truth_metrics.get(metric_name, None)
# Ensure both predicted and ground truth values are numeric
if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)):
rmse_values.append(predicted_value)
ground_truth_values.append(ground_truth_value)
else:
print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric")
if rmse_values and ground_truth_values:
overall_rmse = compute_rmse(rmse_values, ground_truth_values)
print(f"Overall RMSE: {overall_rmse}")
else:
print("Invalid RMSE calculation due to non-numeric values.")
predicted_metrics = {
"RAG_model_response": response,
"ground_truth": ground_truth_answer,
"context_relevance": context_relevance(question, docs),
"context_utilization": context_utilization(response, docs),
"completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer),
"adherence": adherence(response, docs),
"response_time": time_taken,
"rmse": overall_rmse
}
'''
if isinstance(predicted_metrics_rmse, (int, float)) and isinstance(ground_truth_metrics, (int, float)):
rmse_value = compute_rmse(predicted_metrics_rmse.values(), ground_truth_metrics.values())
predicted_metrics_rmse["rmse"] = rmse_value # Adding RMSE to metrics
else:
predicted_metrics_rmse["rmse"] = "Invalid RMSE calculation"
'''
return predicted_metrics
''' def retrieve_ground_truths(question, dataset):
for split_name, instances in dataset.items():
print(f"Processing {split_name} split")
for instance in instances:
if instance['question'] == question:
instance_id = instance['id']
instance_response = instance['response']
# ground_truth_metrics = {
# "context_relevance": instance['relevance_score'],
# "context_utilization": instance['utilization_score'],
# "completeness": instance['completeness_score'],
# "adherence": instance['adherence_score']
# }
print(f"Match found in {split_name} split!")
print(f"ID: {instance_id}, Response: {instance_response}")
return instance_response # Return ground truth response immediately
return None # Return None if no match is found
'''
|