import json import os import pandas as pd from loguru import logger from display.formatting import make_clickable_model from display.utils_old import EvalQueueColumn def fetch_model_results(repo_dir: str, competition_type: str, eval_split: str) -> list[dict]: model_results = [] dirpath = os.path.join(repo_dir, competition_type, eval_split) for root, _, files in os.walk(dirpath): if len(files) == 0 or not all(f.endswith(".json") for f in files): continue for file in files: filepath = os.path.join(root, file) try: with open(filepath, "r") as fp: result = json.load(fp) model_results.append(result) except Exception as e: logger.error(f"Error loading model result from {filepath}: {e}") continue return model_results def get_tossups_leaderboard_df(repo_dir: str, eval_split: str) -> pd.DataFrame: model_results = fetch_model_results(repo_dir, "tossup", eval_split) eval_results = [] for result in model_results: try: metrics = result["metrics"] username = result["username"] model_name = result["model_name"] buzz_accuracy = metrics["buzz_accuracy"] row = { "Submission": f"{username}/{model_name}", "Avg Score ⬆️": metrics["tossup_score"], "Buzz Accuracy": buzz_accuracy, "Buzz Position": metrics["buzz_position"], } if "human_win_rate" in metrics: row["Win Rate w/ Humans"] = metrics["human_win_rate"] row["Win Rate w/ Humans (Aggressive)"] = metrics["human_win_rate_strict"] else: row["Win Rate w/ Humans"] = None row["Win Rate w/ Humans (Aggressive)"] = None eval_results.append(row) except Exception as e: logger.error(f"Error processing model result '{username}/{model_name}': {e}") continue df = pd.DataFrame( eval_results, columns=[ "Submission", "Avg Score ⬆️", "Buzz Accuracy", "Buzz Position", "Win Rate w/ Humans", "Win Rate w/ Humans (Aggressive)", ], ) df.sort_values(by="Avg Score ⬆️", ascending=False, inplace=True) return df def get_bonuses_leaderboard_df(repo_dir: str, eval_split: str) -> pd.DataFrame: model_results = fetch_model_results(repo_dir, "bonus", eval_split) eval_results = [] for result in model_results: try: metrics = result["metrics"] username = result["username"] model_name = result["model_name"] row = { "Submission": f"{username}/{model_name}", "Question Accuracy": metrics["question_accuracy"], "Part Accuracy": metrics["part_accuracy"], } eval_results.append(row) except Exception as e: logger.error(f"Error processing model result '{username}/{model_name}': {e}") continue df = pd.DataFrame( eval_results, columns=["Submission", "Question Accuracy", "Part Accuracy"], ) df.sort_values(by="Question Accuracy", ascending=False, inplace=True) return df def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: # TODO: This function is stale, but might be a good reference point for new implementation """Creates the different dataframes for the evaluation queues requestes""" entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] all_evals = [] for entry in entries: if ".json" in entry: file_path = os.path.join(save_path, entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) elif ".md" not in entry: # this is a folder sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] for sub_entry in sub_entries: file_path = os.path.join(save_path, entry, sub_entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] running_list = [e for e in all_evals if e["status"] == "RUNNING"] finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] df_pending = pd.DataFrame.from_records(pending_list, columns=cols) df_running = pd.DataFrame.from_records(running_list, columns=cols) df_finished = pd.DataFrame.from_records(finished_list, columns=cols) return df_finished[cols], df_running[cols], df_pending[cols]