Maharshi Gor
Updated workflow APIs, code clean up and minor functions for hf pipeline support
f064c62
import json | |
from typing import Any | |
import gradio as gr | |
import numpy as np | |
import pandas as pd | |
from datasets import Dataset | |
from loguru import logger | |
from app_configs import CONFIGS, UNSELECTED_PIPELINE_NAME | |
from components import commons | |
from components.model_pipeline.model_pipeline import PipelineInterface, PipelineState | |
from components.typed_dicts import PipelineStateDict | |
from display.formatting import styled_error | |
from shared.workflows import factory | |
from shared.workflows.metrics import evaluate_prediction | |
from shared.workflows.qb_agents import QuizBowlBonusAgent | |
from shared.workflows.runners import run_and_eval_bonus_dataset, run_and_evaluate_bonus | |
from submission import submit | |
from . import populate, validation | |
from .plotting import create_bonus_confidence_plot, create_bonus_html | |
from .utils import create_error_message | |
from .validation import UserInputWorkflowValidator | |
def process_bonus_results(results: list[dict]) -> pd.DataFrame: | |
"""Process results from bonus mode and prepare visualization data.""" | |
return pd.DataFrame( | |
[ | |
{ | |
"Part": f"Part {r['number']}", | |
"Correct?": "✅" if r["correct"] == 1 else "❌", | |
"Confidence": r["confidence"], | |
"Prediction": r["guess"], | |
"Explanation": r["explanation"], | |
} | |
for r in results | |
] | |
) | |
def initialize_eval_interface(example: dict, part_outputs: list[dict], input_vars: list[str]): | |
"""Initialize the interface with example text.""" | |
try: | |
html_content = create_bonus_html(example["leadin"], example["parts"]) | |
# Create confidence plot data | |
plot_data = create_bonus_confidence_plot(example["parts"], part_outputs) | |
# Store state | |
state = {"parts": example["parts"], "outputs": part_outputs} | |
# Preparing step outputs for the model | |
step_outputs = {} | |
for i, output in enumerate(part_outputs): | |
key = f"part {i + 1}" | |
step_outputs[key] = {k: v for k, v in output["step_outputs"].items() if k not in input_vars} | |
if output["logprob"] is not None: | |
step_outputs[key]["output_probability"] = float(np.exp(output["logprob"])) | |
return html_content, plot_data, state, step_outputs | |
except Exception as e: | |
error_msg = f"Error initializing interface: {str(e)}" | |
logger.exception(error_msg) | |
return styled_error(error_msg), pd.DataFrame(), {}, {} | |
class BonusInterface: | |
"""Gradio interface for the Bonus mode.""" | |
def __init__(self, app: gr.Blocks, browser_state: dict, dataset: Dataset, model_options: dict, defaults: dict): | |
"""Initialize the Bonus interface.""" | |
logger.info(f"Initializing Bonus interface with dataset size: {len(dataset)}") | |
self.browser_state = browser_state | |
self.ds = dataset | |
self.model_options = model_options | |
self.app = app | |
self.defaults = defaults | |
self.output_state = gr.State(value={}) | |
self.render() | |
# ------------------------------------- LOAD PIPELINE STATE FROM BROWSER STATE ------------------------------------- | |
def load_default_workflow(self): | |
workflow = self.defaults["init_workflow"] | |
pipeline_state_dict = PipelineState.from_workflow(workflow).model_dump() | |
return pipeline_state_dict, {} | |
def load_presaved_pipeline_state(self, browser_state: dict, pipeline_change: bool): | |
try: | |
state_dict = browser_state["bonus"].get("pipeline_state", {}) | |
if state_dict: | |
pipeline_state = PipelineState.model_validate(state_dict) | |
pipeline_state_dict = pipeline_state.model_dump() | |
output_state = browser_state["bonus"].get("output_state", {}) | |
else: | |
pipeline_state_dict, output_state = self.load_default_workflow() | |
except Exception as e: | |
logger.warning(f"Error loading presaved pipeline state: {e}") | |
pipeline_state_dict, output_state = self.load_default_workflow() | |
return browser_state, not pipeline_change, pipeline_state_dict, output_state | |
# ------------------------------------------ INTERFACE RENDER FUNCTIONS ------------------------------------------- | |
def _render_pipeline_interface(self, pipeline_state: PipelineState): | |
"""Render the model interface.""" | |
with gr.Row(elem_classes="bonus-header-row form-inline"): | |
self.pipeline_selector = commons.get_pipeline_selector([]) | |
self.load_btn = gr.Button("⬇️ Import Pipeline", variant="secondary") | |
self.import_error_display = gr.HTML(label="Import Error", elem_id="import-error-display", visible=False) | |
logger.info(f"Rendering {self.__class__.__name__} with pipeline state: {pipeline_state}") | |
self.pipeline_interface = PipelineInterface( | |
self.app, | |
pipeline_state.workflow, | |
ui_state=pipeline_state.ui_state, | |
model_options=list(self.model_options.keys()), | |
config=self.defaults, | |
validator=UserInputWorkflowValidator("bonus"), | |
) | |
def _render_qb_interface(self): | |
"""Render the quizbowl interface.""" | |
with gr.Row(elem_classes="bonus-header-row form-inline"): | |
self.qid_selector = commons.get_qid_selector(len(self.ds)) | |
self.run_btn = gr.Button("Run on Bonus Question", variant="secondary") | |
self.question_display = gr.HTML(label="Question", elem_id="bonus-question-display") | |
self.error_display = gr.HTML(label="Error", elem_id="bonus-error-display", visible=False) | |
self.results_table = gr.DataFrame( | |
label="Model Outputs", | |
value=pd.DataFrame(columns=["Part", "Correct?", "Confidence", "Prediction", "Explanation"]), | |
visible=False, | |
) | |
self.model_outputs_display = gr.JSON(label="Model Outputs", value="{}", show_indices=True, visible=False) | |
with gr.Row(): | |
self.eval_btn = gr.Button("Evaluate", variant="primary") | |
self.model_name_input, self.description_input, self.submit_btn, self.submit_status = ( | |
commons.get_model_submission_accordion(self.app) | |
) | |
def render(self): | |
"""Create the Gradio interface.""" | |
self.hidden_input = gr.Textbox(value="", visible=False, elem_id="hidden-index") | |
workflow = factory.create_empty_bonus_workflow() | |
pipeline_state = PipelineState.from_workflow(workflow) | |
with gr.Row(): | |
# Model Panel | |
with gr.Column(scale=1): | |
self._render_pipeline_interface(pipeline_state) | |
with gr.Column(scale=1): | |
self._render_qb_interface() | |
self._setup_event_listeners() | |
def validate_workflow(self, state_dict: PipelineStateDict): | |
"""Validate the workflow.""" | |
try: | |
pipeline_state = PipelineState(**state_dict) | |
validation.validate_workflow( | |
pipeline_state.workflow, | |
required_input_vars=CONFIGS["bonus"]["required_input_vars"], | |
required_output_vars=CONFIGS["bonus"]["required_output_vars"], | |
) | |
except Exception as e: | |
raise gr.Error(f"Error validating workflow: {str(e)}") | |
def get_new_question_html(self, question_id: int): | |
"""Get the HTML for a new question.""" | |
if question_id is None: | |
logger.error("Question ID is None. Setting to 1") | |
question_id = 1 | |
try: | |
question_id = int(question_id) - 1 | |
if not self.ds or question_id < 0 or question_id >= len(self.ds): | |
return "Invalid question ID or dataset not loaded" | |
example = self.ds[question_id] | |
leadin = example["leadin"] | |
parts = example["parts"] | |
return create_bonus_html(leadin, parts) | |
except Exception as e: | |
return f"Error loading question: {str(e)}" | |
def get_pipeline_names(self, profile: gr.OAuthProfile | None) -> list[str]: | |
names = [UNSELECTED_PIPELINE_NAME] + populate.get_pipeline_names("bonus", profile) | |
return gr.update(choices=names, value=UNSELECTED_PIPELINE_NAME) | |
def load_pipeline( | |
self, model_name: str, pipeline_change: bool, profile: gr.OAuthProfile | None | |
) -> tuple[str, bool, PipelineStateDict, dict]: | |
try: | |
workflow = populate.load_workflow("bonus", model_name, profile) | |
if workflow is None: | |
logger.warning(f"Could not load workflow for {model_name}") | |
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=False) | |
pipeline_state_dict = PipelineState.from_workflow(workflow).model_dump() | |
return UNSELECTED_PIPELINE_NAME, not pipeline_change, pipeline_state_dict, gr.update(visible=True) | |
except Exception as e: | |
error_msg = styled_error(f"Error loading pipeline: {str(e)}") | |
return UNSELECTED_PIPELINE_NAME, gr.skip(), gr.skip(), gr.update(visible=True, value=error_msg) | |
# ------------------------------------- Agent Functions ----------------------------------------------------------- | |
def single_run( | |
self, | |
question_id: int, | |
state_dict: PipelineStateDict, | |
) -> tuple[str, Any, Any]: | |
"""Run the agent in bonus mode and updates the interface. | |
Returns: | |
tuple: Contains the following components: | |
- question_display: HTML display content of the question | |
- output_state: Updated state with question parts and outputs | |
- results_table: DataFrame with model predictions and scores | |
- model_outputs_display: Detailed step outputs from the model | |
- error_display: Any error messages (if applicable) | |
""" | |
try: | |
pipeline_state = validation.validate_bonus_workflow(state_dict) | |
question_id = int(question_id - 1) | |
if not self.ds or question_id < 0 or question_id >= len(self.ds): | |
raise gr.Error("Invalid question ID or dataset not loaded") | |
example = self.ds[question_id] | |
agent = QuizBowlBonusAgent(pipeline_state.workflow) | |
model_output = run_and_evaluate_bonus(agent, example, return_extras=True) | |
part_outputs = model_output["part_outputs"] | |
# Process results and prepare visualization data | |
html_content, plot_data, output_state, step_outputs = initialize_eval_interface( | |
example, part_outputs, pipeline_state.workflow.inputs | |
) | |
df = process_bonus_results(part_outputs) | |
return ( | |
html_content, | |
gr.update(value=output_state), | |
gr.update(value=df, label=f"Model Outputs for Question {question_id + 1}", visible=True), | |
gr.update(value=step_outputs, label=f"Step Outputs for Question {question_id + 1}", visible=True), | |
gr.update(visible=False), | |
) | |
except Exception as e: | |
error_msg = styled_error(create_error_message(e)) | |
logger.exception(f"Error running bonus: {e}") | |
return ( | |
gr.skip(), | |
gr.skip(), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=True, value=error_msg), | |
) | |
def evaluate(self, state_dict: PipelineStateDict, progress: gr.Progress = gr.Progress()): | |
"""Evaluate the bonus questions.""" | |
try: | |
pipeline_state = validation.validate_bonus_workflow(state_dict) | |
# Validate inputs | |
if not self.ds or not self.ds.num_rows: | |
return "No dataset loaded", None, None | |
agent = QuizBowlBonusAgent(pipeline_state.workflow) | |
model_outputs = run_and_eval_bonus_dataset( | |
agent, self.ds, num_workers=2, return_extras=True, tqdm_provider=progress.tqdm | |
) | |
n_parts_correct = 0 | |
total_parts = 0 | |
n_questions_correct = 0 | |
for model_output in model_outputs: | |
part_outputs = model_output["part_outputs"] | |
n_parts_correct += sum(output["correct"] for output in part_outputs) | |
total_parts += len(part_outputs) | |
n_questions_correct += int(n_parts_correct == len(part_outputs)) | |
p_accuracy = n_parts_correct / total_parts | |
q_accuracy = n_questions_correct / len(self.ds) | |
df = pd.DataFrame( | |
[ | |
{ | |
"Question Accuracy": f"{q_accuracy:.2%}", | |
"Part Accuracy": f"{p_accuracy:.2%}", | |
"Questions Evaluated": len(self.ds), | |
} | |
] | |
) | |
# plot_data = create_scatter_pyplot(part_numbers, part_scores) | |
return ( | |
gr.update(value=df, label="Scores on Sample Set"), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
) | |
except Exception as e: | |
error_msg = styled_error(create_error_message(e)) | |
logger.exception(f"Error evaluating bonus: {e}") | |
return gr.skip(), gr.skip(), gr.update(visible=True, value=error_msg) | |
def submit_model( | |
self, | |
model_name: str, | |
description: str, | |
state_dict: PipelineStateDict, | |
profile: gr.OAuthProfile = None, | |
): | |
"""Submit the model output.""" | |
pipeline_state = PipelineState(**state_dict) | |
return submit.submit_model(model_name, description, pipeline_state.workflow, "bonus", profile) | |
def pipeline_state(self): | |
return self.pipeline_interface.pipeline_state | |
# ------------------------------------- Event Listeners ----------------------------------------------------------- | |
def _setup_event_listeners(self): | |
# Initialize with the default question (ID 0) | |
gr.on( | |
triggers=[self.app.load, self.qid_selector.change], | |
fn=self.get_new_question_html, | |
inputs=[self.qid_selector], | |
outputs=[self.question_display], | |
) | |
gr.on( | |
triggers=[self.app.load], | |
fn=self.get_pipeline_names, | |
outputs=[self.pipeline_selector], | |
) | |
pipeline_change = self.pipeline_interface.pipeline_change | |
gr.on( | |
triggers=[self.app.load], | |
fn=self.load_presaved_pipeline_state, | |
inputs=[self.browser_state, pipeline_change], | |
outputs=[self.browser_state, pipeline_change, self.pipeline_state, self.output_state], | |
) | |
self.load_btn.click( | |
fn=self.load_pipeline, | |
inputs=[self.pipeline_selector, pipeline_change], | |
outputs=[self.pipeline_selector, pipeline_change, self.pipeline_state, self.import_error_display], | |
) | |
self.pipeline_interface.add_triggers_for_pipeline_export([self.pipeline_state.change], self.pipeline_state) | |
self.run_btn.click( | |
self.single_run, | |
inputs=[ | |
self.qid_selector, | |
self.pipeline_state, | |
], | |
outputs=[ | |
self.question_display, | |
self.output_state, | |
self.results_table, | |
self.model_outputs_display, | |
self.error_display, | |
], | |
) | |
self.eval_btn.click( | |
fn=self.evaluate, | |
inputs=[self.pipeline_state], | |
outputs=[self.results_table, self.model_outputs_display, self.error_display], | |
) | |
self.submit_btn.click( | |
fn=self.submit_model, | |
inputs=[ | |
self.model_name_input, | |
self.description_input, | |
self.pipeline_state, | |
], | |
outputs=[self.submit_status], | |
) | |