circle-guard-bench / src /populate.py
apsys's picture
submodule + versioning
3c01baa
raw
history blame
10.9 kB
"""
Populate the GuardBench leaderboard from HuggingFace datasets.
"""
import json
import os
import pandas as pd
import tempfile
from typing import Dict, Tuple, List
from glob import glob
from huggingface_hub import snapshot_download, hf_hub_download, HfApi
from datasets import load_dataset
from src.display.utils import GUARDBENCH_COLUMN, DISPLAY_COLS, CATEGORIES
from src.envs import RESULTS_DATASET_ID, TOKEN, LEADERBOARD_FILE, CACHE_PATH
from src.leaderboard.processor import leaderboard_to_dataframe, load_leaderboard_data, save_leaderboard_data, process_jsonl_submission, add_entries_to_leaderboard
def get_versioned_leaderboard_file(version="v0"):
"""
Get the versioned leaderboard file path.
"""
base_name, ext = os.path.splitext(LEADERBOARD_FILE)
return f"{base_name}_{version}{ext}"
def download_leaderboard_data(version="v0") -> bool:
"""
Download the latest leaderboard data from HuggingFace.
Args:
version: The dataset version to download
"""
try:
# Create a temporary directory to download the submissions
temp_dir = os.path.join(CACHE_PATH, f"temp_submissions_{version}")
os.makedirs(temp_dir, exist_ok=True)
# Get the versioned leaderboard file
leaderboard_file = get_versioned_leaderboard_file(version)
# Download the entire repository
try:
snapshot_path = snapshot_download(
repo_id=RESULTS_DATASET_ID,
repo_type="dataset",
local_dir=temp_dir,
token=TOKEN,
ignore_patterns=["*.md", ".*"],
etag_timeout=30
)
# Process all submission files
all_entries = []
submission_files = []
# Look for submission files in the submissions directory
submissions_dir = os.path.join(snapshot_path, "submissions")
version_submissions_dir = os.path.join(snapshot_path, f"submissions_{version}")
# Check both standard and versioned submission directories
if os.path.exists(submissions_dir):
submission_files.extend(glob(os.path.join(submissions_dir, "*.jsonl")))
if os.path.exists(version_submissions_dir):
submission_files.extend(glob(os.path.join(version_submissions_dir, "*.jsonl")))
# Also look for any versioned JSONL files in the root
submission_files.extend(glob(os.path.join(snapshot_path, f"*_{version}.jsonl")))
# If we're looking for v0 and no versioned files found, use generic ones
if version == "v0" and not submission_files:
submission_files.extend(glob(os.path.join(snapshot_path, "*.jsonl")))
# Process each submission file
for file_path in submission_files:
entries, _ = process_jsonl_submission(file_path)
# Filter entries to those that match the version or don't have version specified
filtered_entries = [
entry for entry in entries
if entry.get("version", "v0") == version or "version" not in entry
]
all_entries.extend(filtered_entries)
# Create leaderboard data structure
leaderboard_data = {
"entries": all_entries,
"last_updated": pd.Timestamp.now().isoformat(),
"version": version
}
# Save to local file
save_leaderboard_data(leaderboard_data, leaderboard_file)
return True
except Exception as e:
print(f"Error downloading repository: {e}")
# If we can't download the repository, try to download individual files
try:
api = HfApi(token=TOKEN)
files = api.list_repo_files(repo_id=RESULTS_DATASET_ID, repo_type="dataset")
# Look for versioned and regular files
submission_files = [
f for f in files
if (f.endswith(f'_{version}.jsonl') or
f.startswith(f'submissions_{version}/') or
(version == "v0" and f.endswith('.jsonl')))
]
all_entries = []
for file_path in submission_files:
try:
local_path = hf_hub_download(
repo_id=RESULTS_DATASET_ID,
filename=file_path,
repo_type="dataset",
token=TOKEN
)
entries, _ = process_jsonl_submission(local_path)
# Filter entries to those that match the version or don't have version specified
filtered_entries = [
entry for entry in entries
if entry.get("version", "v0") == version or "version" not in entry
]
all_entries.extend(filtered_entries)
except Exception as file_error:
print(f"Error downloading file {file_path}: {file_error}")
# Create leaderboard data structure
leaderboard_data = {
"entries": all_entries,
"last_updated": pd.Timestamp.now().isoformat(),
"version": version
}
# Save to local file
save_leaderboard_data(leaderboard_data, leaderboard_file)
return True
except Exception as list_error:
print(f"Error listing repository files: {list_error}")
# If we can't download anything, create an empty leaderboard
if not os.path.exists(leaderboard_file):
empty_data = {
"entries": [],
"last_updated": pd.Timestamp.now().isoformat(),
"version": version
}
save_leaderboard_data(empty_data, leaderboard_file)
return False
except Exception as e:
print(f"Error downloading leaderboard data: {e}")
# Ensure we have at least an empty leaderboard file
leaderboard_file = get_versioned_leaderboard_file(version)
if not os.path.exists(leaderboard_file):
empty_data = {
"entries": [],
"last_updated": pd.Timestamp.now().isoformat(),
"version": version
}
save_leaderboard_data(empty_data, leaderboard_file)
return False
def get_leaderboard_df(version="v0") -> pd.DataFrame:
"""
Get the leaderboard data as a DataFrame.
Args:
version: The dataset version to retrieve
"""
# Try to download the latest data
download_leaderboard_data(version=version)
# Load from local file
leaderboard_file = get_versioned_leaderboard_file(version)
leaderboard_data = load_leaderboard_data(leaderboard_file)
# Convert to DataFrame
df = leaderboard_to_dataframe(leaderboard_data)
return df
def get_category_leaderboard_df(category: str, version="v0") -> pd.DataFrame:
"""
Get the leaderboard data filtered by a specific category.
Args:
category: The category to filter by (e.g., "Criminal, Violent, and Terrorist Activity")
version: The dataset version to retrieve
Returns:
DataFrame with metrics for the specified category
"""
# Load the leaderboard data
leaderboard_file = get_versioned_leaderboard_file(version)
leaderboard_data = load_leaderboard_data(leaderboard_file)
# Filter entries to only include those with data for the specified category
filtered_entries = []
for entry in leaderboard_data.get("entries", []):
# Check if the entry has data for this category
if "per_category_metrics" in entry and category in entry["per_category_metrics"]:
# Create a new entry with just the overall info and this category's metrics
filtered_entry = {
"model_name": entry.get("model_name", "Unknown Model"),
"model_type": entry.get("model_type", "Unknown"),
"submission_date": entry.get("submission_date", ""),
"version": entry.get("version", version),
}
# Extract metrics for this category
category_metrics = entry["per_category_metrics"][category]
# Add metrics for each test type
for test_type in category_metrics:
if test_type and isinstance(category_metrics[test_type], dict):
for metric, value in category_metrics[test_type].items():
col_name = f"{test_type}_{metric}"
filtered_entry[col_name] = value
# Calculate average F1 for this category
f1_values = []
for test_type in category_metrics:
if test_type and isinstance(category_metrics[test_type], dict) and "f1_binary" in category_metrics[test_type]:
f1_values.append(category_metrics[test_type]["f1_binary"])
if f1_values:
filtered_entry["average_f1"] = sum(f1_values) / len(f1_values)
# Add specific test type F1 scores for display
for test_type in ["default_prompts", "jailbreaked_prompts", "default_answers", "jailbreaked_answers"]:
if test_type in category_metrics and "f1_binary" in category_metrics[test_type]:
filtered_entry[f"{test_type}_f1"] = category_metrics[test_type]["f1_binary"]
filtered_entries.append(filtered_entry)
# Create a new leaderboard data structure with the filtered entries
filtered_leaderboard = {
"entries": filtered_entries,
"last_updated": leaderboard_data.get("last_updated", pd.Timestamp.now().isoformat()),
"version": version
}
# Convert to DataFrame
df = leaderboard_to_dataframe(filtered_leaderboard)
return df
def get_detailed_model_data(model_name: str, version="v0") -> Dict:
"""
Get detailed data for a specific model.
Args:
model_name: The name of the model to get data for
version: The dataset version to retrieve
"""
leaderboard_file = get_versioned_leaderboard_file(version)
leaderboard_data = load_leaderboard_data(leaderboard_file)
for entry in leaderboard_data.get("entries", []):
# Check both the model name and version
entry_version = entry.get("version", "v0")
if entry.get("model_name") == model_name and (entry_version == version or entry_version is None):
return entry
return {}