|
import json |
|
import os |
|
|
|
import pandas as pd |
|
from loguru import logger |
|
|
|
from display.formatting import make_clickable_model |
|
from display.utils_old import EvalQueueColumn |
|
|
|
|
|
def fetch_model_results(repo_dir: str, competition_type: str, eval_split: str) -> list[dict]: |
|
model_results = [] |
|
dirpath = os.path.join(repo_dir, competition_type, eval_split) |
|
for root, _, files in os.walk(dirpath): |
|
if len(files) == 0 or not all(f.endswith(".json") for f in files): |
|
continue |
|
for file in files: |
|
filepath = os.path.join(root, file) |
|
try: |
|
with open(filepath, "r") as fp: |
|
result = json.load(fp) |
|
model_results.append(result) |
|
except Exception as e: |
|
logger.error(f"Error loading model result from {filepath}: {e}") |
|
continue |
|
|
|
return model_results |
|
|
|
|
|
def get_tossups_leaderboard_df(repo_dir: str, eval_split: str) -> pd.DataFrame: |
|
model_results = fetch_model_results(repo_dir, "tossup", eval_split) |
|
|
|
eval_results = [] |
|
for result in model_results: |
|
try: |
|
metrics = result["metrics"] |
|
username = result["username"] |
|
model_name = result["model_name"] |
|
buzz_accuracy = metrics["buzz_accuracy"] |
|
|
|
row = { |
|
"Submission": f"{username}/{model_name}", |
|
"Avg Score ⬆️": metrics["tossup_score"], |
|
"Buzz Accuracy": buzz_accuracy, |
|
"Buzz Position": metrics["buzz_position"], |
|
} |
|
if "human_win_rate" in metrics: |
|
row["Win Rate w/ Humans"] = metrics["human_win_rate"] |
|
row["Win Rate w/ Humans (Aggressive)"] = metrics["human_win_rate_strict"] |
|
else: |
|
row["Win Rate w/ Humans"] = None |
|
row["Win Rate w/ Humans (Aggressive)"] = None |
|
eval_results.append(row) |
|
except Exception as e: |
|
logger.error(f"Error processing model result '{username}/{model_name}': {e}") |
|
continue |
|
|
|
df = pd.DataFrame( |
|
eval_results, |
|
columns=[ |
|
"Submission", |
|
"Avg Score ⬆️", |
|
"Buzz Accuracy", |
|
"Buzz Position", |
|
"Win Rate w/ Humans", |
|
"Win Rate w/ Humans (Aggressive)", |
|
], |
|
) |
|
df.sort_values(by="Avg Score ⬆️", ascending=False, inplace=True) |
|
return df |
|
|
|
|
|
def get_bonuses_leaderboard_df(repo_dir: str, eval_split: str) -> pd.DataFrame: |
|
model_results = fetch_model_results(repo_dir, "bonus", eval_split) |
|
|
|
eval_results = [] |
|
for result in model_results: |
|
try: |
|
metrics = result["metrics"] |
|
username = result["username"] |
|
model_name = result["model_name"] |
|
|
|
row = { |
|
"Submission": f"{username}/{model_name}", |
|
"Question Accuracy": metrics["question_accuracy"], |
|
"Part Accuracy": metrics["part_accuracy"], |
|
} |
|
eval_results.append(row) |
|
except Exception as e: |
|
logger.error(f"Error processing model result '{username}/{model_name}': {e}") |
|
continue |
|
|
|
df = pd.DataFrame( |
|
eval_results, |
|
columns=["Submission", "Question Accuracy", "Part Accuracy"], |
|
) |
|
df.sort_values(by="Question Accuracy", ascending=False, inplace=True) |
|
return df |
|
|
|
|
|
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: |
|
|
|
"""Creates the different dataframes for the evaluation queues requestes""" |
|
entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] |
|
all_evals = [] |
|
|
|
for entry in entries: |
|
if ".json" in entry: |
|
file_path = os.path.join(save_path, entry) |
|
with open(file_path) as fp: |
|
data = json.load(fp) |
|
|
|
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) |
|
data[EvalQueueColumn.revision.name] = data.get("revision", "main") |
|
|
|
all_evals.append(data) |
|
elif ".md" not in entry: |
|
|
|
sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] |
|
for sub_entry in sub_entries: |
|
file_path = os.path.join(save_path, entry, sub_entry) |
|
with open(file_path) as fp: |
|
data = json.load(fp) |
|
|
|
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) |
|
data[EvalQueueColumn.revision.name] = data.get("revision", "main") |
|
all_evals.append(data) |
|
|
|
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] |
|
running_list = [e for e in all_evals if e["status"] == "RUNNING"] |
|
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] |
|
df_pending = pd.DataFrame.from_records(pending_list, columns=cols) |
|
df_running = pd.DataFrame.from_records(running_list, columns=cols) |
|
df_finished = pd.DataFrame.from_records(finished_list, columns=cols) |
|
return df_finished[cols], df_running[cols], df_pending[cols] |
|
|