|
import json |
|
from typing import Any |
|
|
|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
from datasets import Dataset |
|
from loguru import logger |
|
|
|
from app_configs import CONFIGS, UNSELECTED_PIPELINE_NAME |
|
from components import commons |
|
from components.model_pipeline.model_pipeline import PipelineInterface, PipelineState |
|
from components.typed_dicts import PipelineStateDict |
|
from display.formatting import styled_error |
|
from submission import submit |
|
from workflows import factory |
|
from workflows.qb_agents import QuizBowlBonusAgent |
|
|
|
from . import populate, validation |
|
from .plotting import create_bonus_confidence_plot, create_bonus_html |
|
from .utils import evaluate_prediction |
|
|
|
|
|
def process_bonus_results(results: list[dict]) -> pd.DataFrame: |
|
"""Process results from bonus mode and prepare visualization data.""" |
|
return pd.DataFrame( |
|
[ |
|
{ |
|
"Part": f"Part {r['part_number']}", |
|
"Correct?": "✅" if r["score"] == 1 else "❌", |
|
"Confidence": r["confidence"], |
|
"Prediction": r["answer"], |
|
"Explanation": r["explanation"], |
|
} |
|
for r in results |
|
] |
|
) |
|
|
|
|
|
def initialize_eval_interface(example: dict, model_outputs: list[dict], input_vars: list[str]): |
|
"""Initialize the interface with example text.""" |
|
try: |
|
html_content = create_bonus_html(example["leadin"], example["parts"]) |
|
|
|
|
|
plot_data = create_bonus_confidence_plot(example["parts"], model_outputs) |
|
|
|
|
|
state = {"parts": example["parts"], "outputs": model_outputs} |
|
|
|
|
|
step_outputs = {} |
|
for i, output in enumerate(model_outputs): |
|
key = f"part {i + 1}" |
|
step_outputs[key] = {k: v for k, v in output["step_outputs"].items() if k not in input_vars} |
|
if output["logprob"] is not None: |
|
step_outputs[key]["output_probability"] = float(np.exp(output["logprob"])) |
|
|
|
return html_content, plot_data, state, step_outputs |
|
except Exception as e: |
|
logger.exception(f"Error initializing interface: {e.args}") |
|
return f"<div>Error initializing interface: {str(e)}</div>", pd.DataFrame(), {}, {} |
|
|
|
|
|
class BonusInterface: |
|
"""Gradio interface for the Bonus mode.""" |
|
|
|
def __init__(self, app: gr.Blocks, browser_state: dict, dataset: Dataset, model_options: dict, defaults: dict): |
|
"""Initialize the Bonus interface.""" |
|
logger.info(f"Initializing Bonus interface with dataset size: {len(dataset)}") |
|
self.browser_state = browser_state |
|
self.ds = dataset |
|
self.model_options = model_options |
|
self.app = app |
|
self.defaults = defaults |
|
self.output_state = gr.State(value={}) |
|
self.render() |
|
|
|
|
|
|
|
def load_presaved_pipeline_state(self, browser_state: dict, pipeline_change: bool): |
|
logger.debug(f"Loading presaved pipeline state from browser state:\n{json.dumps(browser_state, indent=4)}") |
|
try: |
|
state_dict = browser_state["bonus"].get("pipeline_state", {}) |
|
pipeline_state = PipelineState.model_validate(state_dict) |
|
pipeline_state_dict = pipeline_state.model_dump() |
|
output_state = browser_state["bonus"].get("output_state", {}) |
|
except Exception as e: |
|
logger.warning(f"Error loading presaved pipeline state: {e}") |
|
output_state = {} |
|
workflow = self.defaults["init_workflow"] |
|
pipeline_state_dict = PipelineState.from_workflow(workflow).model_dump() |
|
return browser_state, not pipeline_change, pipeline_state_dict, output_state |
|
|
|
|
|
def _render_pipeline_interface(self, pipeline_state: PipelineState): |
|
"""Render the model interface.""" |
|
with gr.Row(elem_classes="bonus-header-row form-inline"): |
|
self.pipeline_selector = commons.get_pipeline_selector([]) |
|
self.load_btn = gr.Button("⬇️ Import Pipeline", variant="secondary") |
|
self.import_error_display = gr.HTML(label="Import Error", elem_id="import-error-display", visible=False) |
|
self.pipeline_interface = PipelineInterface( |
|
self.app, |
|
pipeline_state.workflow, |
|
ui_state=pipeline_state.ui_state, |
|
model_options=list(self.model_options.keys()), |
|
config=self.defaults, |
|
) |
|
|
|
def _render_qb_interface(self): |
|
"""Render the quizbowl interface.""" |
|
with gr.Row(elem_classes="bonus-header-row form-inline"): |
|
self.qid_selector = commons.get_qid_selector(len(self.ds)) |
|
self.run_btn = gr.Button("Run on Bonus Question", variant="secondary") |
|
|
|
self.question_display = gr.HTML(label="Question", elem_id="bonus-question-display") |
|
self.error_display = gr.HTML(label="Error", elem_id="bonus-error-display", visible=False) |
|
self.results_table = gr.DataFrame( |
|
label="Model Outputs", |
|
value=pd.DataFrame(columns=["Part", "Correct?", "Confidence", "Prediction", "Explanation"]), |
|
visible=False, |
|
) |
|
self.model_outputs_display = gr.JSON(label="Model Outputs", value="{}", show_indices=True, visible=False) |
|
|
|
with gr.Row(): |
|
self.eval_btn = gr.Button("Evaluate", variant="primary") |
|
|
|
self.model_name_input, self.description_input, self.submit_btn, self.submit_status = ( |
|
commons.get_model_submission_accordion(self.app) |
|
) |
|
|
|
def render(self): |
|
"""Create the Gradio interface.""" |
|
self.hidden_input = gr.Textbox(value="", visible=False, elem_id="hidden-index") |
|
workflow = factory.create_empty_tossup_workflow() |
|
pipeline_state = PipelineState.from_workflow(workflow) |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
self._render_pipeline_interface(pipeline_state) |
|
|
|
with gr.Column(scale=1): |
|
self._render_qb_interface() |
|
|
|
self._setup_event_listeners() |
|
|
|
def validate_workflow(self, state_dict: PipelineStateDict): |
|
"""Validate the workflow.""" |
|
try: |
|
pipeline_state = PipelineState(**state_dict) |
|
validation.validate_workflow( |
|
pipeline_state.workflow, |
|
required_input_vars=CONFIGS["bonus"]["required_input_vars"], |
|
required_output_vars=CONFIGS["bonus"]["required_output_vars"], |
|
) |
|
except Exception as e: |
|
raise gr.Error(f"Error validating workflow: {str(e)}") |
|
|
|
def get_new_question_html(self, question_id: int): |
|
"""Get the HTML for a new question.""" |
|
if question_id is None: |
|
logger.error("Question ID is None. Setting to 1") |
|
question_id = 1 |
|
try: |
|
question_id = int(question_id) - 1 |
|
if not self.ds or question_id < 0 or question_id >= len(self.ds): |
|
return "Invalid question ID or dataset not loaded" |
|
|
|
example = self.ds[question_id] |
|
leadin = example["leadin"] |
|
parts = example["parts"] |
|
return create_bonus_html(leadin, parts) |
|
except Exception as e: |
|
return f"Error loading question: {str(e)}" |
|
|
|
def get_pipeline_names(self, profile: gr.OAuthProfile | None) -> list[str]: |
|
names = [UNSELECTED_PIPELINE_NAME] + populate.get_pipeline_names("bonus", profile) |
|
return gr.update(choices=names, value=UNSELECTED_PIPELINE_NAME) |
|
|
|
def load_pipeline( |
|
self, model_name: str, pipeline_change: bool, profile: gr.OAuthProfile | None |
|
) -> tuple[str, bool, PipelineStateDict, dict]: |
|
try: |
|
workflow = populate.load_workflow("bonus", model_name, profile) |
|
if workflow is None: |
|
logger.warning(f"Could not load workflow for {model_name}") |
|
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=False) |
|
pipeline_state_dict = PipelineState.from_workflow(workflow).model_dump() |
|
return UNSELECTED_PIPELINE_NAME, not pipeline_change, pipeline_state_dict, gr.update(visible=True) |
|
except Exception as e: |
|
error_msg = styled_error(f"Error loading pipeline: {str(e)}") |
|
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=True, value=error_msg) |
|
|
|
|
|
|
|
def get_agent_outputs(self, example: dict, pipeline_state: PipelineState): |
|
"""Get the model outputs for a given question ID.""" |
|
outputs = [] |
|
leadin = example["leadin"] |
|
agent = QuizBowlBonusAgent(pipeline_state.workflow) |
|
|
|
for i, part in enumerate(example["parts"]): |
|
|
|
part_output = agent.run(leadin, part["part"]) |
|
|
|
|
|
part_output["part_number"] = i + 1 |
|
part_output["score"] = evaluate_prediction(part_output["answer"], part["clean_answers"]) |
|
|
|
outputs.append(part_output) |
|
|
|
return outputs |
|
|
|
def single_run( |
|
self, |
|
question_id: int, |
|
state_dict: PipelineStateDict, |
|
) -> tuple[str, Any, Any]: |
|
"""Run the agent in bonus mode and updates the interface. |
|
|
|
Returns: |
|
tuple: Contains the following components: |
|
- question_display: HTML display content of the question |
|
- output_state: Updated state with question parts and outputs |
|
- results_table: DataFrame with model predictions and scores |
|
- model_outputs_display: Detailed step outputs from the model |
|
- error_display: Any error messages (if applicable) |
|
""" |
|
try: |
|
pipeline_state = validation.validate_bonus_workflow(state_dict) |
|
question_id = int(question_id - 1) |
|
if not self.ds or question_id < 0 or question_id >= len(self.ds): |
|
raise gr.Error("Invalid question ID or dataset not loaded") |
|
|
|
example = self.ds[question_id] |
|
outputs = self.get_agent_outputs(example, pipeline_state) |
|
|
|
|
|
html_content, plot_data, output_state, step_outputs = initialize_eval_interface( |
|
example, outputs, pipeline_state.workflow.inputs |
|
) |
|
df = process_bonus_results(outputs) |
|
|
|
return ( |
|
html_content, |
|
gr.update(value=output_state), |
|
gr.update(value=df, label=f"Model Outputs for Question {question_id + 1}", visible=True), |
|
gr.update(value=step_outputs, label=f"Step Outputs for Question {question_id + 1}", visible=True), |
|
gr.update(visible=False), |
|
) |
|
except Exception as e: |
|
import traceback |
|
|
|
error_msg = f"Error: {str(e)}\n{traceback.format_exc()}" |
|
return ( |
|
gr.skip(), |
|
gr.skip(), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=True, value=error_msg), |
|
) |
|
|
|
def evaluate(self, state_dict: PipelineStateDict, progress: gr.Progress = gr.Progress()): |
|
"""Evaluate the bonus questions.""" |
|
try: |
|
pipeline_state = validation.validate_bonus_workflow(state_dict) |
|
|
|
if not self.ds or not self.ds.num_rows: |
|
return "No dataset loaded", None, None |
|
|
|
total_correct = 0 |
|
total_parts = 0 |
|
part_scores = [] |
|
part_numbers = [] |
|
|
|
for example in progress.tqdm(self.ds, desc="Evaluating bonus questions"): |
|
model_outputs = self.get_agent_outputs(example, pipeline_state) |
|
|
|
for output in model_outputs: |
|
total_parts += 1 |
|
if output["score"] == 1: |
|
total_correct += 1 |
|
part_scores.append(output["score"]) |
|
part_numbers.append(output["part_number"]) |
|
|
|
accuracy = total_correct / total_parts |
|
df = pd.DataFrame( |
|
[ |
|
{ |
|
"Part Accuracy": f"{accuracy:.2%}", |
|
"Total Score": f"{total_correct}/{total_parts}", |
|
"Questions Evaluated": len(self.ds), |
|
} |
|
] |
|
) |
|
|
|
|
|
return ( |
|
gr.update(value=df, label="Scores on Sample Set"), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
) |
|
except Exception as e: |
|
error_msg = styled_error(f"Error evaluating bonus: {e.args}") |
|
logger.exception(f"Error evaluating bonus: {e.args}") |
|
return gr.skip(), gr.skip(), gr.update(visible=True, value=error_msg) |
|
|
|
def submit_model( |
|
self, |
|
model_name: str, |
|
description: str, |
|
state_dict: PipelineStateDict, |
|
profile: gr.OAuthProfile = None, |
|
): |
|
"""Submit the model output.""" |
|
pipeline_state = PipelineState(**state_dict) |
|
return submit.submit_model(model_name, description, pipeline_state.workflow, "bonus", profile) |
|
|
|
@property |
|
def pipeline_state(self): |
|
return self.pipeline_interface.pipeline_state |
|
|
|
|
|
|
|
def _setup_event_listeners(self): |
|
|
|
|
|
gr.on( |
|
triggers=[self.app.load, self.qid_selector.change], |
|
fn=self.get_new_question_html, |
|
inputs=[self.qid_selector], |
|
outputs=[self.question_display], |
|
) |
|
|
|
gr.on( |
|
triggers=[self.app.load], |
|
fn=self.get_pipeline_names, |
|
outputs=[self.pipeline_selector], |
|
) |
|
|
|
pipeline_change = self.pipeline_interface.pipeline_change |
|
|
|
gr.on( |
|
triggers=[self.app.load], |
|
fn=self.load_presaved_pipeline_state, |
|
inputs=[self.browser_state, pipeline_change], |
|
outputs=[self.browser_state, pipeline_change, self.pipeline_state, self.output_state], |
|
) |
|
self.load_btn.click( |
|
fn=self.load_pipeline, |
|
inputs=[self.pipeline_selector, pipeline_change], |
|
outputs=[self.pipeline_selector, pipeline_change, self.pipeline_state, self.import_error_display], |
|
) |
|
self.pipeline_interface.add_triggers_for_pipeline_export([self.pipeline_state.change], self.pipeline_state) |
|
|
|
self.run_btn.click( |
|
self.single_run, |
|
inputs=[ |
|
self.qid_selector, |
|
self.pipeline_state, |
|
], |
|
outputs=[ |
|
self.question_display, |
|
self.output_state, |
|
self.results_table, |
|
self.model_outputs_display, |
|
self.error_display, |
|
], |
|
) |
|
|
|
self.eval_btn.click( |
|
fn=self.evaluate, |
|
inputs=[self.pipeline_state], |
|
outputs=[self.results_table, self.model_outputs_display, self.error_display], |
|
) |
|
|
|
self.submit_btn.click( |
|
fn=self.submit_model, |
|
inputs=[ |
|
self.model_name_input, |
|
self.description_input, |
|
self.pipeline_state, |
|
], |
|
outputs=[self.submit_status], |
|
) |
|
|