Clémentine
added a singleton-like class to manage all managers per session, plus session state management. Also fixes secret passing to the new leaderboard space
133c6d8
raw
history blame
9.68 kB
import asyncio
import os
import sys
import time
import gradio as gr
import uuid
from datasets import load_dataset
from huggingface_hub import whoami
from loguru import logger
from pathlib import Path
from yourbench_space.config import generate_and_save_config
from yourbench_space.utils import (
SubprocessManagerGroup,
save_files,
update_dataset,
STAGES,
)
from yourbench_space.evaluation import create_eval_file, run_evaluations
from yourbench_space.leaderboard_space.env import HF_TOKEN
project_description = """
# YourBench 🚀
**Dynamic Benchmark Generation for Language Models**
Quickly create zero-shot benchmarks from your documents – keeping models accurate and adaptable
- 📖 [FAQ](#)
- 💻 [GitHub](https://github.com/huggingface/yourbench/tree/v0.2-alpha-space)
"""
logger.remove()
logger.add(sys.stderr, level="INFO")
# Global to store all managers per session
MANAGERS = SubprocessManagerGroup()
docs_path = Path(__file__).parent / "docs.md"
citation_content = (
docs_path.read_text().split("# Citation")[-1].strip()
if docs_path.exists()
else "# Citation\n\nDocumentation file not found."
)
def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
manager = MANAGERS.get(session_state.value)
config_path = generate_and_save_config(hf_org, hf_dataset_name, session_state.value, manager.config_path)
for _ in range(5):
time.sleep(0.5)
if config_path.exists():
return (
"✅ Config saved!",
gr.update(value=str(config_path), visible=True, interactive=True),
)
return (
"❌ Config generation failed.",
gr.update(visible=False, interactive=False),
)
final_dataset = None
def update_process_status(session_state: gr.State):
"""Update process status and include exit details if process has terminated"""
manager = MANAGERS.get(session_state.value)
is_running = manager.is_running()
if not is_running:
exit_code, exit_reason = manager.get_exit_details()
status_text = f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}" if exit_reason else "Process Status: Stopped"
return gr.update(value=False, label=status_text)
return gr.update(value=True, label="Process Status: Running")
def prepare_task(session_state: gr.State, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
manager = MANAGERS.get(session_state.value)
new_env = os.environ.copy()
if oauth_token:
new_env["HF_TOKEN"] = oauth_token.token
new_env["DATASET_PREFIX"] = hf_dataset_name
manager.start_process(custom_env=new_env)
def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None):
if oauth_token is None:
return gr.Dropdown([], label="Organization")
try:
user_info = whoami(oauth_token.token)
org_names = [org["name"] for org in user_info.get("orgs", [])]
user_name = user_info.get("name", "Unknown User")
org_names.insert(0, user_name)
return gr.Dropdown(org_names, value=user_name, label="Organization")
except Exception as e:
return gr.Dropdown([], label="Organization")
def switch_to_run_generation_tab():
return gr.Tabs(selected=1)
def enable_button(files):
return gr.update(interactive=bool(files))
def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name):
# Test dataset existence
eval_ds_name = f"{org_name}/{eval_name}"
# Test dataset existence
try:
load_dataset(eval_ds_name, streaming=True)
except Exception as e:
print(f"Error while loading the dataset: {e}")
return
# Run evaluations
create_eval_file(eval_ds_name)
status = asyncio.run(run_evaluations(eval_ds_name=eval_ds_name, org=org_name))
# Create space
from huggingface_hub import HfApi
repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}"
api = HfApi()
try:
api.create_repo(repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token)
api.upload_folder(repo_id=repo_id, repo_type="space", folder_path="src/", token=oauth_token.token)
api.add_space_secret(repo_id=repo_id, key="HF_TOKEN", value=oauth_token.token, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token)
except Exception as e:
status = "Evaluation" + status + "\nLeaderboard creation:" + e
return status
with gr.Blocks(theme=gr.themes.Default()) as app:
# We initialize the session state with the user randomly generated uuid
# Using uuid4 makes collision cases extremely unlikely even for concurrent users
session_state = gr.State(uuid.uuid4(), delete_callback=lambda uid: MANAGERS.remove(uid))
MANAGERS.create(session_state.value)
gr.Markdown(project_description)
with gr.Tabs() as tabs:
with gr.Tab("Setup", id=0):
with gr.Row():
with gr.Accordion("Hugging Face Settings"):
login_btn = gr.LoginButton()
hf_org_dropdown = gr.Dropdown(
choices=[], label="Organization", allow_custom_value=True
)
app.load(
update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown
)
hf_dataset_name = gr.Textbox(
label="Dataset name",
value="yourbench",
info="Name of your new evaluation dataset",
)
with gr.Accordion("Upload documents"):
file_input = gr.File(
label="Upload text files",
file_count="multiple",
file_types=[".txt", ".md", ".html", ".pdf"],
)
output = gr.Textbox(label="Log")
file_input.upload(
lambda files: save_files(session_state, [file.name for file in files]),
file_input,
output,
)
preview_button = gr.Button("Generate New Config", interactive=False)
log_message = gr.Textbox(label="Log Message", visible=True)
download_button = gr.File(
label="Download Config", visible=False, interactive=False
)
file_input.change(enable_button, inputs=file_input, outputs=preview_button)
preview_button.click(
generate_and_return,
inputs=[hf_org_dropdown, hf_dataset_name, session_state],
outputs=[log_message, download_button],
)
preview_button.click(
switch_to_run_generation_tab,
inputs=None,
outputs=tabs,
)
with gr.Tab("Run Generation", id=1):
with gr.Row():
start_button = gr.Button("Start Task")
start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name])
stop_button = gr.Button("Stop Task")
stop_button.click(MANAGERS.stop_process, inputs=session_state)
kill_button = gr.Button("Kill Task")
kill_button.click(MANAGERS.kill_process, inputs=session_state)
with gr.Row():
with gr.Column():
with gr.Accordion("Log Output", open=True):
log_output = gr.Code(language=None, lines=20, interactive=False)
process_status = gr.Checkbox(label="Process Status", interactive=False)
status_timer = gr.Timer(1.0, active=True)
status_timer.tick(update_process_status, inputs=session_state, outputs=process_status)
with gr.Column():
with gr.Accordion("Stages", open=True):
stages_table = gr.CheckboxGroup(
choices=STAGES,
value=[],
label="Pipeline Stages Completed",
interactive=False,
)
with gr.Accordion("Ingestion"):
ingestion_df = gr.DataFrame()
with gr.Accordion("Summarization"):
summarization_df = gr.DataFrame()
with gr.Accordion("Single-Hop"):
single_hop = gr.DataFrame()
with gr.Accordion("Answer Generation"):
answers_df = gr.DataFrame()
stages_table.change(
update_dataset, inputs=[stages_table, hf_org_dropdown, hf_dataset_name], outputs=[ingestion_df, summarization_df, single_hop, answers_df]
)
log_timer = gr.Timer(1.0, active=True)
log_timer.tick(
MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table]
)
with gr.Tab("Evaluate", id=2):
with gr.Row():
btn_launch_evals = gr.Button("Launch evaluations")
status = gr.Textbox(label="Status")
btn_launch_evals.click(run_evaluation_pipeline, [hf_org_dropdown, hf_dataset_name], status)
app.launch(allowed_paths=["/app"])