Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
from glob import glob | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from matplotlib.colors import ListedColormap, BoundaryNorm | |
from glob import glob | |
import os | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from matplotlib.colors import ListedColormap, BoundaryNorm | |
import pandas as pd | |
# Load text benchmark results | |
noncot_results = glob("results/*.pkl") | |
noncot_results_qwen = glob("results_qwen/*.pkl") | |
# Load vision benchmark results | |
vision_results = glob("results-vision/*.pkl") | |
# Load CoT text benchmark results | |
cot_text_results = glob("results-cot/*.pkl") | |
# Load CoT vision benchmark results | |
# cot_vision_results = glob("results-vision-CoT/*.pkl") | |
# Function to load data, add model type and name | |
def load_data(files, model_type): | |
data = [] | |
for file in files: | |
df = pd.read_pickle(file) | |
df["Model Type"] = model_type | |
df["Model Name"] = file.split("/")[-1].replace(".pkl", "") | |
data.append(df) | |
return pd.concat(data, ignore_index=True) | |
# Load and label all data | |
data = load_data(noncot_results, "Text Only") | |
data_qwen = load_data(noncot_results_qwen, "Text Only") | |
vision_data = load_data(vision_results, "Vision") | |
cot_text_data = load_data(cot_text_results, "CoT Text Only") | |
# cot_vision_data = load_data(cot_vision_results, "CoT Vision") | |
# Combine all data into a single DataFrame | |
all_data = pd.concat([data_qwen, vision_data, cot_text_data], ignore_index=True) | |
all_model_names = all_data["Model Name"].unique() | |
all_text_only_model_names = list( | |
all_data[all_data["Model Type"] == "Text Only"]["Model Name"].unique() | |
) | |
all_cot_text_only_models = list( | |
all_data[all_data["Model Type"] == "CoT Text Only"]["Model Name"].unique() | |
) | |
text_only_filtered_raw = None | |
text_only_filtered_raw_cot = None | |
## Continue with the cold code -- | |
# TODO: Update me to read from all_data for later | |
# Load the csv files into a dict with keys being name of the file and values being the data | |
data = {file: pd.read_pickle(file) for file in noncot_results} | |
# Load the vision files into a dict | |
vision_data = {file: pd.read_pickle(file) for file in vision_results} | |
# Load the CoT text files into a dict | |
cot_text_data = {file: pd.read_pickle(file) for file in cot_text_results} | |
# Load the CoT vision files into a dict | |
# cot_vision_data = {file: pd.read_pickle(file) for file in cot_vision_results} | |
data_qwen = {file: pd.read_pickle(file) for file in noncot_results_qwen} | |
intersection_df = pd.read_pickle( | |
"./intersection_results/gpt-3.5-judge-by_Qwen_5times_intersection_subset_1.pkl" | |
) | |
# accuracy for each model | |
intersection_df_acc = ( | |
intersection_df.groupby("model_name")["parsed_judge_response"].mean().reset_index() | |
) | |
intersection_df_acc["Accuracy"] = intersection_df_acc["parsed_judge_response"] * 100 | |
intersection_df_acc.drop("parsed_judge_response", axis=1, inplace=True) | |
intersection_df_acc.sort_values("Accuracy", ascending=False, inplace=True) | |
def calculate_accuracy(df): | |
return df["parsed_judge_response"].mean() * 100 | |
def accuracy_breakdown(df): | |
# 4 level accuracy | |
return (df.groupby("difficulty_level")["parsed_judge_response"].mean() * 100).values | |
# Define the column names with icons | |
headers_with_icons = [ | |
"π€ Model Name", | |
"β Overall", | |
"π Level 1", | |
"π Level 2", | |
"π Level 3", | |
"π¬ Level 4", | |
] | |
column_names = [ | |
"Model Name", | |
"Overall Accuracy", | |
"Level 1 Accuracy", | |
"Level 2 Accuracy", | |
"Level 3 Accuracy", | |
"Level 4 Accuracy", | |
] | |
# Function to process data | |
def process_data(data): | |
data_for_df = [] | |
for file, df in data.items(): | |
overall_accuracy = round(calculate_accuracy(df), 2) | |
breakdown_accuracy = [round(acc, 2) for acc in accuracy_breakdown(df)] | |
model_name = file.split("/")[-1].replace(".pkl", "") | |
data_for_df.append([model_name, overall_accuracy] + breakdown_accuracy) | |
return data_for_df | |
# Process all data | |
text_data_for_df = process_data(data) | |
text_data_for_df_qwen = process_data(data_qwen) | |
vision_data_for_df = process_data(vision_data) | |
cot_text_data_for_df = process_data(cot_text_data) | |
# cot_vision_data_for_df = process_data(cot_vision_data) | |
# Create DataFrames | |
accuracy_df = pd.DataFrame(text_data_for_df, columns=column_names) | |
accuracy_df_qwen = pd.DataFrame(text_data_for_df_qwen, columns=column_names) | |
vision_accuracy_df = pd.DataFrame(vision_data_for_df, columns=column_names) | |
cot_text_accuracy_df = pd.DataFrame(cot_text_data_for_df, columns=column_names) | |
# cot_vision_accuracy_df = pd.DataFrame(cot_vision_data_for_df, columns=column_names) | |
# Function to finalize DataFrame | |
def finalize_df(df): | |
df = df.round(1) # Round to one decimal place | |
df = df.applymap(lambda x: f"{x:.1f}" if isinstance(x, (int, float)) else x) | |
df.columns = headers_with_icons | |
df.sort_values(by="β Overall", ascending=False, inplace=True) | |
# add a new column with the order (index) | |
df["#"] = range(1, len(df) + 1) | |
# bring rank to the first column | |
cols = df.columns.tolist() | |
cols = cols[-1:] + cols[:-1] | |
df = df[cols] | |
return df | |
# Finalize all DataFrames | |
accuracy_df = finalize_df(accuracy_df) | |
accuracy_df_qwen = finalize_df(accuracy_df_qwen) | |
vision_accuracy_df = finalize_df(vision_accuracy_df) | |
cot_text_accuracy_df = finalize_df(cot_text_accuracy_df) | |
# cot_vision_accuracy_df = finalize_df(cot_vision_accuracy_df) | |
def load_heatmap(evt: gr.SelectData): | |
heatmap_image = gr.Image(f"results/{evt.value}.jpg") | |
return heatmap_image | |
def load_heatmap_qwen(evt: gr.SelectData): | |
heatmap_image = gr.Image(f"results_qwen/{evt.value}.jpg") | |
return heatmap_image | |
def load_vision_heatmap(evt: gr.SelectData): | |
heatmap_image = gr.Image(f"results-vision/{evt.value}.jpg") | |
return heatmap_image | |
def load_cot_heatmap(evt: gr.SelectData): | |
heatmap_image = gr.Image(f"results-cot/{evt.value}.jpg") | |
return heatmap_image | |
def load_cot_vision_heatmap(evt: gr.SelectData): | |
heatmap_image = gr.Image(f"results-vision-CoT/{evt.value}.jpg") | |
return heatmap_image | |
def calculate_order_by_first_substring(selected_models): | |
global text_only_filtered_raw | |
first_columns = all_data[all_data["substring_index"] == 1] | |
query_ids_df = first_columns[first_columns["Model Type"] == "Text Only"] | |
query_ids_df = query_ids_df[query_ids_df["Model Name"].isin(selected_models)] | |
query_ids_df = query_ids_df.groupby("query_id").filter( | |
lambda x: x["parsed_judge_response"].eq(1).all() | |
) | |
fsm_ids = query_ids_df.fsm_id.unique() | |
text_only = all_data[all_data["Model Type"] == "Text Only"] | |
text_only_filtered = text_only[text_only["fsm_id"].isin(fsm_ids)] | |
text_only_filtered_raw = text_only_filtered.copy() | |
query_ids = text_only_filtered.query_id.unique() | |
text_only_filtered = ( | |
text_only_filtered.groupby(["Model Name"])["parsed_judge_response"] | |
.mean() | |
.reset_index() | |
) | |
text_only_filtered["Accuracy"] = text_only_filtered["parsed_judge_response"] * 100 | |
text_only_filtered.drop("parsed_judge_response", axis=1, inplace=True) | |
text_only_filtered["Accuracy"] = text_only_filtered["Accuracy"].apply( | |
lambda x: round(x, 2) | |
) | |
text_only_filtered.sort_values("Accuracy", ascending=False, inplace=True) | |
number_of_queries = len(query_ids) | |
number_of_fsms = len(fsm_ids) | |
return text_only_filtered, number_of_queries, number_of_fsms | |
def calculate_order_by_first_substring_cot(selected_models): | |
global text_only_filtered_raw_cot | |
first_columns = all_data[all_data["substring_index"] == 1] | |
query_ids_df = first_columns[first_columns["Model Type"] == "CoT Text Only"] | |
query_ids_df = query_ids_df[query_ids_df["Model Name"].isin(selected_models)] | |
query_ids_df = query_ids_df.groupby("query_id").filter( | |
lambda x: x["parsed_judge_response"].eq(1).all() | |
) | |
fsm_ids = query_ids_df.fsm_id.unique() | |
text_only = all_data[all_data["Model Type"] == "CoT Text Only"] | |
text_only_filtered = text_only[text_only["fsm_id"].isin(fsm_ids)] | |
text_only_filtered_raw_cot = text_only_filtered.copy() | |
query_ids = text_only_filtered.query_id.unique() | |
text_only_filtered = ( | |
text_only_filtered.groupby(["Model Name"])["parsed_judge_response"] | |
.mean() | |
.reset_index() | |
) | |
text_only_filtered["Accuracy"] = text_only_filtered["parsed_judge_response"] * 100 | |
text_only_filtered.drop("parsed_judge_response", axis=1, inplace=True) | |
text_only_filtered["Accuracy"] = text_only_filtered["Accuracy"].apply( | |
lambda x: round(x, 2) | |
) | |
text_only_filtered.sort_values("Accuracy", ascending=False, inplace=True) | |
number_of_queries = len(query_ids) | |
number_of_fsms = len(fsm_ids) | |
return text_only_filtered, number_of_queries, number_of_fsms | |
def generate_heatmap_for_specific_model(model_name): | |
global text_only_filtered_raw | |
cmap = ListedColormap(["lightblue", "red", "green"]) | |
bounds = [-1.5, -0.5, 0.5, 1.5] | |
norm = BoundaryNorm(bounds, cmap.N) | |
model_df = text_only_filtered_raw[ | |
text_only_filtered_raw["Model Name"] == model_name | |
] | |
model_df["fsm_info"] = model_df.apply( | |
lambda x: f"{x['num_states']} states, {x['num_alphabet']} alphabet", axis=1 | |
) | |
model_df = model_df.sort_values(by=["num_states", "num_alphabet"]) | |
pivot_df = ( | |
model_df.pivot_table( | |
index="fsm_info", | |
columns="substring_index", | |
values="parsed_judge_response", | |
aggfunc="first", | |
) | |
.fillna(-1) | |
.astype(float) | |
) | |
# Dynamically adjust figure size | |
num_rows, num_cols = pivot_df.shape | |
fig_width = max(12, num_cols * 0.5) # Adjust width per column | |
fig_height = max(8, num_rows * 0.4) # Adjust height per row | |
fig, ax = plt.subplots(figsize=(fig_width, fig_height)) | |
sns.heatmap( | |
pivot_df, | |
cmap=cmap, | |
linewidths=1, | |
linecolor="black", | |
norm=norm, | |
cbar=False, | |
square=True, | |
ax=ax, | |
) | |
plt.title(f"Heatmap for Model: {model_name}", fontsize=12) | |
plt.xlabel("Substring Index") | |
plt.ylabel("FSM (States, Alphabet)") | |
plt.xticks(rotation=45) | |
sns.despine(ax=ax, top=True, right=True, left=True, bottom=True) | |
return fig | |
def generate_heatmap_for_specific_model_cot(model_name): | |
global text_only_filtered_raw_cot | |
cmap = ListedColormap(["lightblue", "red", "green"]) | |
bounds = [-1.5, -0.5, 0.5, 1.5] | |
norm = BoundaryNorm(bounds, cmap.N) | |
model_df = text_only_filtered_raw_cot[ | |
text_only_filtered_raw_cot["Model Name"] == model_name | |
] | |
model_df["fsm_info"] = model_df.apply( | |
lambda x: f"{x['num_states']} states, {x['num_alphabet']} alphabet", axis=1 | |
) | |
model_df = model_df.sort_values(by=["num_states", "num_alphabet"]) | |
pivot_df = ( | |
model_df.pivot_table( | |
index="fsm_info", | |
columns="substring_index", | |
values="parsed_judge_response", | |
aggfunc="first", | |
) | |
.fillna(-1) | |
.astype(float) | |
) | |
# Dynamically adjust figure size | |
num_rows, num_cols = pivot_df.shape | |
fig_width = max(12, num_cols * 0.5) # Adjust width per column | |
fig_height = max(8, num_rows * 0.4) # Adjust height per row | |
fig, ax = plt.subplots(figsize=(fig_width, fig_height)) | |
sns.heatmap( | |
pivot_df, | |
cmap=cmap, | |
linewidths=1, | |
linecolor="black", | |
norm=norm, | |
cbar=False, | |
square=True, | |
ax=ax, | |
) | |
plt.title(f"Heatmap for Model: {model_name}", fontsize=12) | |
plt.xlabel("Substring Index") | |
plt.ylabel("FSM (States, Alphabet)") | |
plt.xticks(rotation=45) | |
sns.despine(ax=ax, top=True, right=True, left=True, bottom=True) | |
return fig | |
def generate_heatmap_for_intersection_model(model_name): | |
global intersection_df | |
cmap = ListedColormap(["lightblue", "red", "green"]) | |
bounds = [-1.5, -0.5, 0.5, 1.5] | |
norm = BoundaryNorm(bounds, cmap.N) | |
# Filter for a specific model | |
model_df = intersection_df[intersection_df["model_name"] == model_name].copy() | |
if model_df.empty: | |
print(f"No data found for model {model_name}. Skipping heatmap generation.") | |
return None | |
model_df["fsm_info"] = model_df.apply( | |
lambda x: f"{x['num_states']} states, {x['num_alphabet']} alphabet", axis=1 | |
) | |
model_df = model_df.sort_values(by=["num_states", "num_alphabet"]) | |
pivot_df = ( | |
model_df.pivot_table( | |
index="fsm_info", | |
columns="substring_index", | |
values="parsed_judge_response", | |
aggfunc="first", | |
) | |
.fillna(-1) | |
.astype(float) | |
) | |
# Dynamically adjust figure size | |
num_rows, num_cols = pivot_df.shape | |
fig_width = max(12, num_cols * 0.5) | |
fig_height = max(8, num_rows * 0.4) | |
fig, ax = plt.subplots(figsize=(fig_width, fig_height)) | |
sns.heatmap( | |
pivot_df, | |
cmap=cmap, | |
linewidths=1, | |
linecolor="black", | |
norm=norm, | |
cbar=False, | |
square=True, | |
ax=ax, | |
) | |
plt.title(f"Heatmap for Model: {model_name}", fontsize=12) | |
plt.xlabel("Substring Index") | |
plt.ylabel("FSM (States, Alphabet)") | |
plt.xticks(rotation=45) | |
sns.despine(ax=ax, top=True, right=True, left=True, bottom=True) | |
plt.close(fig) | |
return fig | |
def show_constraint_heatmap(evt: gr.SelectData): | |
model_name = evt.value | |
return generate_heatmap_for_specific_model(model_name) | |
def show_constraint_heatmap_cot(evt: gr.SelectData): | |
model_name = evt.value | |
return generate_heatmap_for_specific_model_cot(model_name) | |
def show_intersection_heatmap(evt: gr.SelectData): | |
model_name = evt.value | |
return generate_heatmap_for_intersection_model(model_name) | |
with gr.Blocks() as demo: | |
gr.Markdown("# FSM Benchmark Leaderboard") | |
with gr.Tab("Text-only Benchmark"): | |
gr.Markdown("# Text-only Leaderboard (Judged by Qwen)") | |
leader_board = gr.Dataframe(accuracy_df_qwen, headers=headers_with_icons) | |
gr.Markdown("## Heatmap") | |
heatmap_image_qwen = gr.Image(label="", show_label=False) | |
leader_board.select(fn=load_heatmap_qwen, outputs=[heatmap_image_qwen]) | |
with gr.Tab("Vision Benchmark", visible=False): | |
gr.Markdown("# Vision Benchmark Leaderboard") | |
leader_board_vision = gr.Dataframe( | |
vision_accuracy_df, headers=headers_with_icons | |
) | |
gr.Markdown("## Heatmap") | |
heatmap_image_vision = gr.Image(label="", show_label=False) | |
leader_board_vision.select( | |
fn=load_vision_heatmap, outputs=[heatmap_image_vision] | |
) | |
with gr.Tab("Text-only Benchmark (CoT)", visible=False): | |
gr.Markdown("# Text-only Leaderboard (CoT)") | |
cot_leader_board_text = gr.Dataframe( | |
cot_text_accuracy_df, headers=headers_with_icons | |
) | |
gr.Markdown("## Heatmap") | |
cot_heatmap_image_text = gr.Image(label="", show_label=False) | |
cot_leader_board_text.select( | |
fn=load_cot_heatmap, outputs=[cot_heatmap_image_text] | |
) | |
# with gr.Tab("Vision Benchmark (CoT)"): | |
# gr.Markdown("# Vision Benchmark Leaderboard (CoT)") | |
# cot_leader_board_vision = gr.Dataframe( | |
# cot_vision_accuracy_df, headers=headers_with_icons | |
# ) | |
# gr.Markdown("## Heatmap") | |
# cot_heatmap_image_vision = gr.Image(label="", show_label=False) | |
# cot_leader_board_vision.select( | |
# fn=load_cot_vision_heatmap, outputs=[cot_heatmap_image_vision] | |
# ) | |
with gr.Tab("Constraint Text-only Results"): | |
gr.Markdown("## Constraint Text-only Leaderboard by first substring") | |
included_models = gr.CheckboxGroup( | |
label="Models to include", | |
choices=all_text_only_model_names, | |
value=all_text_only_model_names, | |
interactive=True, | |
) | |
with gr.Row(): | |
number_of_queries = gr.Textbox(label="Number of included queries") | |
number_of_fsms = gr.Textbox(label="Number of included FSMs") | |
constrained_leader_board_text = gr.Dataframe() | |
constrained_leader_board_plot = gr.Plot() | |
included_models.select( | |
fn=calculate_order_by_first_substring, | |
inputs=[included_models], | |
outputs=[constrained_leader_board_text, number_of_queries, number_of_fsms], | |
queue=True, | |
) | |
with gr.Tab("Constraint Text-only Results (CoT)", visible=False): | |
gr.Markdown("## Constraint Text-only Leaderboard by first substrin (CoT)") | |
included_models_cot = gr.CheckboxGroup( | |
label="Models to include", | |
choices=all_cot_text_only_models, | |
value=all_cot_text_only_models, | |
interactive=True, | |
) | |
with gr.Row(): | |
number_of_queries_cot = gr.Textbox(label="Number of included queries") | |
number_of_fsms_cot = gr.Textbox(label="Number of included FSMs") | |
constrained_leader_board_text_cot = gr.Dataframe() | |
constrained_leader_board_plot_cot = gr.Plot() | |
with gr.Tab("Majority Vote (Subset 1)", visible=False): | |
gr.Markdown("## Majority Vote (Subset 1)") | |
intersection_leader_board = gr.Dataframe( | |
intersection_df_acc, headers=headers_with_icons | |
) | |
heatmap_image = gr.Plot(label="Model Heatmap") | |
with gr.Tab("Text-only Benchmark (deprecated)", visible=False): | |
gr.Markdown("# Text-only Leaderboard") | |
leader_board = gr.Dataframe(accuracy_df, headers=headers_with_icons) | |
gr.Markdown("## Heatmap") | |
heatmap_image = gr.Image(label="", show_label=False) | |
leader_board.select(fn=load_heatmap, outputs=[heatmap_image]) | |
# ============ Callbacks ============ | |
included_models_cot.select( | |
fn=calculate_order_by_first_substring_cot, | |
inputs=[included_models_cot], | |
outputs=[ | |
constrained_leader_board_text_cot, | |
number_of_queries_cot, | |
number_of_fsms_cot, | |
], | |
queue=True, | |
) | |
constrained_leader_board_text.select( | |
fn=show_constraint_heatmap, outputs=[constrained_leader_board_plot] | |
) | |
constrained_leader_board_text_cot.select( | |
fn=show_constraint_heatmap_cot, outputs=[constrained_leader_board_plot_cot] | |
) | |
intersection_leader_board.select( | |
fn=show_intersection_heatmap, outputs=[heatmap_image] | |
) | |
demo.launch() | |