circle-guard-bench / src /populate.py
apsys's picture
mode fix
a7eca29
raw
history blame
9.81 kB
"""
Populate the GuardBench leaderboard from HuggingFace datasets.
"""
import json
import os
import pandas as pd
import tempfile
from typing import Dict, List, Optional
from datetime import datetime
import numpy as np
from huggingface_hub import hf_hub_download, HfApi
from datasets import load_dataset
from src.display.utils import GUARDBENCH_COLUMN, DISPLAY_COLS, CATEGORIES
from src.envs import RESULTS_DATASET_ID, TOKEN, CACHE_PATH
from src.leaderboard.processor import leaderboard_to_dataframe
def get_latest_leaderboard(version="v0") -> Optional[Dict]:
"""
Get the latest leaderboard data from HuggingFace dataset.
"""
try:
# Try to download the leaderboard file
leaderboard_path = hf_hub_download(
repo_id=RESULTS_DATASET_ID,
filename=f"leaderboards/leaderboard_{version}.json",
repo_type="dataset",
token=TOKEN
)
with open(leaderboard_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error downloading leaderboard: {e}")
return None
def get_model_entry(model_name: str, mode: str, version="v0") -> Optional[Dict]:
"""
Get a specific model's entry from the entries folder, uniquely identified by model_name, mode, and version.
"""
try:
model_name_safe = model_name.replace("/", "_").replace(" ", "_")
mode_safe = str(mode).replace("/", "_").replace(" ", "_").lower()
entry_path = hf_hub_download(
repo_id=RESULTS_DATASET_ID,
filename=f"entries/entry_{model_name_safe}_{mode_safe}_{version}.json",
repo_type="dataset",
token=TOKEN
)
with open(entry_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error downloading model entry: {e}")
return None
def get_all_entries(version="v0", mode: str = None) -> List[Dict]:
"""
Get all model entries from the entries folder. If mode is provided, only return entries matching that mode.
"""
try:
api = HfApi(token=TOKEN)
files = api.list_repo_files(repo_id=RESULTS_DATASET_ID, repo_type="dataset")
if mode is not None:
mode_safe = str(mode).replace("/", "_").replace(" ", "_").lower()
entry_files = [f for f in files if f.startswith("entries/") and f"_{mode_safe}_" in f and f.endswith(f"_{version}.json")]
else:
entry_files = [f for f in files if f.startswith("entries/") and f.endswith(f"_{version}.json")]
entries = []
for entry_file in entry_files:
try:
entry_path = hf_hub_download(
repo_id=RESULTS_DATASET_ID,
filename=entry_file,
repo_type="dataset",
token=TOKEN
)
with open(entry_path, 'r') as f:
entry_data = json.load(f)
entries.append(entry_data)
except Exception as e:
print(f"Error loading entry {entry_file}: {e}")
return entries
except Exception as e:
print(f"Error listing entries: {e}")
return []
def get_leaderboard_df(version="v0") -> pd.DataFrame:
"""
Get the leaderboard data as a DataFrame.
"""
# Get latest leaderboard data
leaderboard_data = get_latest_leaderboard(version)
if not leaderboard_data:
# If no leaderboard exists, try to build it from entries
entries = get_all_entries(version)
if entries:
leaderboard_data = {
"entries": entries,
"last_updated": datetime.now().isoformat(),
"version": version
}
else:
# Return empty DataFrame if no data available
return pd.DataFrame(columns=DISPLAY_COLS)
# Convert to DataFrame
return leaderboard_to_dataframe(leaderboard_data)
def get_category_leaderboard_df(category: str, version="v0") -> pd.DataFrame:
"""
Get the leaderboard data filtered by a specific category.
"""
# Get latest leaderboard data
leaderboard_data = get_latest_leaderboard(version)
if not leaderboard_data:
# If no leaderboard exists, try to build it from entries
entries = get_all_entries(version)
if entries:
leaderboard_data = {
"entries": entries,
"last_updated": datetime.now().isoformat(),
"version": version
}
else:
# Return empty DataFrame if no data available
return pd.DataFrame(columns=DISPLAY_COLS)
# Filter entries to only include those with data for the specified category
filtered_entries = []
for entry in leaderboard_data.get("entries", []):
# Copy all base fields
filtered_entry = {
"model_name": entry.get("model_name", "Unknown Model"),
"model_type": entry.get("model_type", "Unknown"),
"guard_model_type": entry.get("guard_model_type", "Unknown"),
"mode": entry.get("mode", "Strict"),
"submission_date": entry.get("submission_date", ""),
"version": entry.get("version", version),
"base_model": entry.get("base_model", ""),
"revision": entry.get("revision", ""),
"precision": entry.get("precision", ""),
"weight_type": entry.get("weight_type", "")
}
if "per_category_metrics" in entry and category in entry["per_category_metrics"]:
category_metrics = entry["per_category_metrics"][category]
# Add all metrics for each test type
for test_type, metrics in category_metrics.items():
if isinstance(metrics, dict):
for metric, value in metrics.items():
col_name = f"{test_type}_{metric}"
filtered_entry[col_name] = value
# Also add the non-binary version for F1 scores
if metric == "f1_binary":
filtered_entry[f"{test_type}_f1"] = value
# Calculate averages
f1_values = []
recall_values = []
precision_values = []
accuracy_values = []
category_recall_values = []
total_samples = 0
for test_type in ["default_prompts", "jailbreaked_prompts", "default_answers", "jailbreaked_answers"]:
if test_type in category_metrics and isinstance(category_metrics[test_type], dict):
test_metrics = category_metrics[test_type]
if "f1_binary" in test_metrics and pd.notna(test_metrics["f1_binary"]):
f1_values.append(test_metrics["f1_binary"])
if "recall_binary" in test_metrics and pd.notna(test_metrics["recall_binary"]):
recall_values.append(test_metrics["recall_binary"])
category_recall_values.append(test_metrics["recall_binary"])
if "precision_binary" in test_metrics and pd.notna(test_metrics["precision_binary"]):
precision_values.append(test_metrics["precision_binary"])
if "accuracy" in test_metrics and pd.notna(test_metrics["accuracy"]):
accuracy_values.append(test_metrics["accuracy"])
if "sample_count" in test_metrics and pd.notna(test_metrics["sample_count"]):
total_samples += test_metrics["sample_count"]
# print(f"F1 values: {f1_values}")
# print(f1_values, recall_values, precision_values, accuracy_values, total_samples)
# Add overall averages
if f1_values:
filtered_entry["average_f1"] = sum(f1_values) / len(f1_values)
if recall_values:
filtered_entry["average_recall"] = sum(recall_values) / len(recall_values)
if precision_values:
filtered_entry["average_precision"] = sum(precision_values) / len(precision_values)
# Add category-specific values to standard macro metric keys
if accuracy_values:
filtered_entry["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values)
else:
filtered_entry["macro_accuracy"] = np.nan
if category_recall_values:
filtered_entry["macro_recall"] = sum(category_recall_values) / len(category_recall_values)
else:
filtered_entry["macro_recall"] = np.nan
if total_samples > 0:
filtered_entry["total_evals_count"] = total_samples
else:
filtered_entry["total_evals_count"] = np.nan
filtered_entries.append(filtered_entry)
# Create a new leaderboard data structure with the filtered entries
filtered_leaderboard = {
"entries": filtered_entries,
"last_updated": leaderboard_data.get("last_updated", datetime.now().isoformat()),
"version": version
}
# print(filtered_leaderboard)
# Convert to DataFrame
return leaderboard_to_dataframe(filtered_leaderboard)
def get_detailed_model_data(model_name: str, mode: str, version="v0") -> Dict:
"""
Get detailed data for a specific model and mode.
"""
entry = get_model_entry(model_name, mode, version)
if entry:
return entry
leaderboard_data = get_latest_leaderboard(version)
if leaderboard_data:
for entry in leaderboard_data.get("entries", []):
if entry.get("model_name") == model_name and str(entry.get("mode")).lower() == str(mode).lower():
return entry
return {}