File size: 9,806 Bytes
d4d998a b1cb07d 9ae09c0 d4d998a b1cb07d d4d998a b1cb07d d4d998a b1cb07d 3c01baa b1cb07d 3c01baa b1cb07d 3c01baa a7eca29 d4d998a a7eca29 d4d998a b1cb07d a7eca29 b1cb07d a7eca29 b1cb07d d4d998a a7eca29 b1cb07d a7eca29 b1cb07d a7eca29 b1cb07d d4d998a b1cb07d d4d998a b1cb07d d4d998a 3c01baa d4d998a b1cb07d d4d998a b1cb07d d4d998a b1cb07d d4d998a 3c01baa d4d998a b1cb07d d4d998a b1cb07d a17bcda b1cb07d d4d998a b1cb07d d4d998a b1cb07d d4d998a b1cb07d d4d998a b1cb07d b4647b5 d4d998a b1cb07d b4647b5 b1cb07d 3f01f81 a9dbb77 b1cb07d d4d998a b1cb07d d4d998a b4647b5 9ae09c0 b4647b5 9ae09c0 b4647b5 9ae09c0 b4647b5 d4d998a b1cb07d 3c01baa d4d998a 9a237d2 d4d998a b1cb07d d4d998a a7eca29 d4d998a a7eca29 d4d998a a7eca29 b1cb07d a7eca29 b1cb07d d4d998a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
"""
Populate the GuardBench leaderboard from HuggingFace datasets.
"""
import json
import os
import pandas as pd
import tempfile
from typing import Dict, List, Optional
from datetime import datetime
import numpy as np
from huggingface_hub import hf_hub_download, HfApi
from datasets import load_dataset
from src.display.utils import GUARDBENCH_COLUMN, DISPLAY_COLS, CATEGORIES
from src.envs import RESULTS_DATASET_ID, TOKEN, CACHE_PATH
from src.leaderboard.processor import leaderboard_to_dataframe
def get_latest_leaderboard(version="v0") -> Optional[Dict]:
"""
Get the latest leaderboard data from HuggingFace dataset.
"""
try:
# Try to download the leaderboard file
leaderboard_path = hf_hub_download(
repo_id=RESULTS_DATASET_ID,
filename=f"leaderboards/leaderboard_{version}.json",
repo_type="dataset",
token=TOKEN
)
with open(leaderboard_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error downloading leaderboard: {e}")
return None
def get_model_entry(model_name: str, mode: str, version="v0") -> Optional[Dict]:
"""
Get a specific model's entry from the entries folder, uniquely identified by model_name, mode, and version.
"""
try:
model_name_safe = model_name.replace("/", "_").replace(" ", "_")
mode_safe = str(mode).replace("/", "_").replace(" ", "_").lower()
entry_path = hf_hub_download(
repo_id=RESULTS_DATASET_ID,
filename=f"entries/entry_{model_name_safe}_{mode_safe}_{version}.json",
repo_type="dataset",
token=TOKEN
)
with open(entry_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error downloading model entry: {e}")
return None
def get_all_entries(version="v0", mode: str = None) -> List[Dict]:
"""
Get all model entries from the entries folder. If mode is provided, only return entries matching that mode.
"""
try:
api = HfApi(token=TOKEN)
files = api.list_repo_files(repo_id=RESULTS_DATASET_ID, repo_type="dataset")
if mode is not None:
mode_safe = str(mode).replace("/", "_").replace(" ", "_").lower()
entry_files = [f for f in files if f.startswith("entries/") and f"_{mode_safe}_" in f and f.endswith(f"_{version}.json")]
else:
entry_files = [f for f in files if f.startswith("entries/") and f.endswith(f"_{version}.json")]
entries = []
for entry_file in entry_files:
try:
entry_path = hf_hub_download(
repo_id=RESULTS_DATASET_ID,
filename=entry_file,
repo_type="dataset",
token=TOKEN
)
with open(entry_path, 'r') as f:
entry_data = json.load(f)
entries.append(entry_data)
except Exception as e:
print(f"Error loading entry {entry_file}: {e}")
return entries
except Exception as e:
print(f"Error listing entries: {e}")
return []
def get_leaderboard_df(version="v0") -> pd.DataFrame:
"""
Get the leaderboard data as a DataFrame.
"""
# Get latest leaderboard data
leaderboard_data = get_latest_leaderboard(version)
if not leaderboard_data:
# If no leaderboard exists, try to build it from entries
entries = get_all_entries(version)
if entries:
leaderboard_data = {
"entries": entries,
"last_updated": datetime.now().isoformat(),
"version": version
}
else:
# Return empty DataFrame if no data available
return pd.DataFrame(columns=DISPLAY_COLS)
# Convert to DataFrame
return leaderboard_to_dataframe(leaderboard_data)
def get_category_leaderboard_df(category: str, version="v0") -> pd.DataFrame:
"""
Get the leaderboard data filtered by a specific category.
"""
# Get latest leaderboard data
leaderboard_data = get_latest_leaderboard(version)
if not leaderboard_data:
# If no leaderboard exists, try to build it from entries
entries = get_all_entries(version)
if entries:
leaderboard_data = {
"entries": entries,
"last_updated": datetime.now().isoformat(),
"version": version
}
else:
# Return empty DataFrame if no data available
return pd.DataFrame(columns=DISPLAY_COLS)
# Filter entries to only include those with data for the specified category
filtered_entries = []
for entry in leaderboard_data.get("entries", []):
# Copy all base fields
filtered_entry = {
"model_name": entry.get("model_name", "Unknown Model"),
"model_type": entry.get("model_type", "Unknown"),
"guard_model_type": entry.get("guard_model_type", "Unknown"),
"mode": entry.get("mode", "Strict"),
"submission_date": entry.get("submission_date", ""),
"version": entry.get("version", version),
"base_model": entry.get("base_model", ""),
"revision": entry.get("revision", ""),
"precision": entry.get("precision", ""),
"weight_type": entry.get("weight_type", "")
}
if "per_category_metrics" in entry and category in entry["per_category_metrics"]:
category_metrics = entry["per_category_metrics"][category]
# Add all metrics for each test type
for test_type, metrics in category_metrics.items():
if isinstance(metrics, dict):
for metric, value in metrics.items():
col_name = f"{test_type}_{metric}"
filtered_entry[col_name] = value
# Also add the non-binary version for F1 scores
if metric == "f1_binary":
filtered_entry[f"{test_type}_f1"] = value
# Calculate averages
f1_values = []
recall_values = []
precision_values = []
accuracy_values = []
category_recall_values = []
total_samples = 0
for test_type in ["default_prompts", "jailbreaked_prompts", "default_answers", "jailbreaked_answers"]:
if test_type in category_metrics and isinstance(category_metrics[test_type], dict):
test_metrics = category_metrics[test_type]
if "f1_binary" in test_metrics and pd.notna(test_metrics["f1_binary"]):
f1_values.append(test_metrics["f1_binary"])
if "recall_binary" in test_metrics and pd.notna(test_metrics["recall_binary"]):
recall_values.append(test_metrics["recall_binary"])
category_recall_values.append(test_metrics["recall_binary"])
if "precision_binary" in test_metrics and pd.notna(test_metrics["precision_binary"]):
precision_values.append(test_metrics["precision_binary"])
if "accuracy" in test_metrics and pd.notna(test_metrics["accuracy"]):
accuracy_values.append(test_metrics["accuracy"])
if "sample_count" in test_metrics and pd.notna(test_metrics["sample_count"]):
total_samples += test_metrics["sample_count"]
# print(f"F1 values: {f1_values}")
# print(f1_values, recall_values, precision_values, accuracy_values, total_samples)
# Add overall averages
if f1_values:
filtered_entry["average_f1"] = sum(f1_values) / len(f1_values)
if recall_values:
filtered_entry["average_recall"] = sum(recall_values) / len(recall_values)
if precision_values:
filtered_entry["average_precision"] = sum(precision_values) / len(precision_values)
# Add category-specific values to standard macro metric keys
if accuracy_values:
filtered_entry["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values)
else:
filtered_entry["macro_accuracy"] = np.nan
if category_recall_values:
filtered_entry["macro_recall"] = sum(category_recall_values) / len(category_recall_values)
else:
filtered_entry["macro_recall"] = np.nan
if total_samples > 0:
filtered_entry["total_evals_count"] = total_samples
else:
filtered_entry["total_evals_count"] = np.nan
filtered_entries.append(filtered_entry)
# Create a new leaderboard data structure with the filtered entries
filtered_leaderboard = {
"entries": filtered_entries,
"last_updated": leaderboard_data.get("last_updated", datetime.now().isoformat()),
"version": version
}
# print(filtered_leaderboard)
# Convert to DataFrame
return leaderboard_to_dataframe(filtered_leaderboard)
def get_detailed_model_data(model_name: str, mode: str, version="v0") -> Dict:
"""
Get detailed data for a specific model and mode.
"""
entry = get_model_entry(model_name, mode, version)
if entry:
return entry
leaderboard_data = get_latest_leaderboard(version)
if leaderboard_data:
for entry in leaderboard_data.get("entries", []):
if entry.get("model_name") == model_name and str(entry.get("mode")).lower() == str(mode).lower():
return entry
return {}
|