Clémentine
tmp
3adea5e
raw
history blame
12.2 kB
import os
import sys
import time
import uuid
import asyncio
from pathlib import Path
from loguru import logger
import gradio as gr
from datasets import load_dataset
from huggingface_hub import whoami
from yourbench_space import PATH
from yourbench_space.utils import (
STAGES,
SubprocessManagerGroup,
save_files,
update_dataset,
map_stage_names,
is_running_locally,
)
from yourbench_space.config import generate_and_save_config
from yourbench_space.evaluation import run_evaluations, create_eval_file
project_description = """
# YourBench 🚀
**Dynamic Benchmark Generation for Language Models**
Quickly create zero-shot benchmarks from your documents – keeping models accurate and adaptable
- 📖 [FAQ](#)
- 💻 [GitHub](https://github.com/huggingface/yourbench)
"""
logger.remove()
logger.add(sys.stderr, level="INFO")
# Global to store all managers per session
MANAGERS = SubprocessManagerGroup()
USER_ID_SESSION_MAP: dict[str, str] = {}
docs_path = Path(__file__).parent / "docs.md"
citation_content = (
docs_path.read_text().split("# Citation")[-1].strip()
if docs_path.exists()
else "# Citation\n\nDocumentation file not found."
)
def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
manager = MANAGERS.get(session_state)
if manager is None: # should not be possible
return (
"❌ Config generation failed",
gr.update(visible=False, interactive=False),
)
session_uid = session_state.value
config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path)
for _ in range(5):
time.sleep(0.5)
if config_path.exists():
gr.Success("✅ Config generated successfully!")
return (
"✅ Config saved successfully!",
gr.update(value=str(config_path), visible=True, interactive=True),
)
gr.Error("Failed to generate config")
return (
"❌ Config generation failed",
gr.update(visible=False, interactive=False),
)
final_dataset = None
def update_process_status(session_state: gr.State):
"""Update process status and include exit details if process has terminated"""
if session_state is None:
return gr.update(value=False, label="Not running")
manager = MANAGERS.get(session_state.value)
if manager is None:
return gr.update(value=False, label="Not running")
is_running = manager.is_running()
if not is_running:
exit_code, exit_reason = manager.get_exit_details()
status_text = (
f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}"
if exit_reason
else "Process Status: Stopped"
)
return gr.update(value=False, label=status_text)
return gr.update(value=True, label="Process Status: Running")
def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
if oauth_token is None and not is_running_locally():
gr.Warning("You need to log in to use this Space")
return
new_env = os.environ.copy()
if oauth_token:
new_env["HF_TOKEN"] = oauth_token.token
new_env["DATASET_PREFIX"] = hf_dataset_name
MANAGERS.start_process(session_uid, custom_env=new_env)
def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None):
if oauth_token is None:
return gr.Dropdown([], label="Organization")
try:
user_info = whoami(oauth_token.token)
org_names = [org["name"] for org in user_info.get("orgs", [])]
user_name = user_info.get("name", "Unknown User")
org_names.insert(0, user_name)
return gr.Dropdown(org_names, value=user_name, label="Organization")
except Exception as e:
return gr.Dropdown([], label="Organization")
def switch_to_run_generation_tab():
return gr.Tabs(selected=1)
def enable_button(files):
return gr.update(interactive=bool(files))
def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name):
# Test dataset existence
eval_ds_name = f"{org_name}/{eval_name}"
# Test dataset existence
try:
load_dataset(eval_ds_name, streaming=True, token=oauth_token.token)
except Exception as e:
print(f"Error while loading the dataset: {e}")
return
# Run evaluations
create_eval_file(eval_ds_name)
status = asyncio.run(run_evaluations(eval_ds_name=eval_ds_name, org=org_name))
# Create space
from huggingface_hub import HfApi
repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}"
api = HfApi()
try:
api.create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk="gradio",
token=oauth_token.token,
)
api.upload_folder(
repo_id=repo_id,
repo_type="space",
folder_path="src/",
token=oauth_token.token,
)
api.add_space_secret(
repo_id=repo_id,
key="HF_TOKEN",
value=oauth_token.token,
token=oauth_token.token,
)
api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token)
except Exception as e:
status = "Evaluation" + status + "\nLeaderboard creation:" + e
return status
def init_session(profile: gr.OAuthProfile | None):
"""Update session on load"""
if is_running_locally():
username = "local"
elif profile:
username = profile.username
else:
username = None
local_uuid = USER_ID_SESSION_MAP.get(username, str(uuid.uuid4()))
if manager := MANAGERS.get(local_uuid):
if manager.is_running():
logger.info(f"Found existing running session for {local_uuid}, restoring")
return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
else:
logger.info(f"Found existing stale session for {local_uuid}, starting new")
MANAGERS.remove(local_uuid)
local_uuid = str(uuid.uuid4())
if username:
USER_ID_SESSION_MAP[username] = local_uuid
MANAGERS.create(local_uuid)
logger.info(f"Started session for {local_uuid}")
return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
with gr.Blocks(theme=gr.themes.Default()) as app:
session_state = gr.State()
gr.Markdown(project_description)
with gr.Tabs() as tabs:
with gr.Tab("Setup", id=0):
with gr.Row():
with gr.Accordion("Hugging Face Settings"):
login_btn = gr.LoginButton()
hf_org_dropdown = gr.Dropdown(choices=[], label="Organization", allow_custom_value=True)
app.load(update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown)
hf_dataset_name = gr.Textbox(
label="Dataset name",
value="yourbench",
info="Name of your new evaluation dataset",
)
with gr.Accordion("Upload Files"):
file_input = gr.File(
label="Upload text files",
file_count="multiple",
file_types=[".txt", ".md", ".html", ".pdf"],
)
output = gr.Textbox(label="Log")
file_input.upload(
save_files,
inputs=[session_state, file_input],
outputs=output,
)
delete_button = gr.Button("Delete Uploaded Files", visible=False)
preview_button = gr.Button("Generate New Config", interactive=False)
log_message = gr.Textbox(label="Log Message", visible=True)
download_button = gr.File(label="Download Config", visible=False, interactive=False)
file_input.change(
lambda files: gr.update(visible=bool(files)),
inputs=file_input,
outputs=delete_button,
)
file_input.change(enable_button, inputs=file_input, outputs=preview_button)
def clean_and_confirm(uid):
MANAGERS.clean_workdir(uid)
return (
"🗑️ All uploaded files have been deleted!",
gr.update(value=None),
gr.update(interactive=False),
)
delete_button.click(
clean_and_confirm,
inputs=session_state,
outputs=[output, file_input, preview_button],
)
preview_button.click(
generate_and_return,
inputs=[hf_org_dropdown, hf_dataset_name, session_state],
outputs=[log_message, download_button],
)
preview_button.click(
switch_to_run_generation_tab,
inputs=None,
outputs=tabs,
)
with gr.Tab("Run Generation", id=1):
with gr.Row():
start_button = gr.Button("Start Task")
stop_button = gr.Button("Stop Task")
kill_button = gr.Button("Kill Task")
start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name])
stop_button.click(MANAGERS.stop_process, inputs=session_state)
kill_button.click(MANAGERS.kill_process, inputs=session_state)
process_status = gr.Checkbox(label="Process Status", interactive=False)
status_timer = gr.Timer(2.0, active=True)
status_timer.tick(update_process_status, inputs=session_state, outputs=process_status)
with gr.Row():
with gr.Accordion("Stages", open=True):
stages_table = gr.CheckboxGroup(
choices=map_stage_names(STAGES),
value=[],
label="Pipeline Stages Completed",
container=False,
interactive=False,
)
with gr.Row():
with gr.Column():
with gr.Accordion("Log Output", open=True):
log_output = gr.Code(language=None, lines=20, interactive=False)
with gr.Column():
with gr.Accordion("Ingestion Preview"):
ingestion_df = gr.DataFrame()
with gr.Accordion("Summarization Preview"):
summarization_df = gr.DataFrame()
with gr.Accordion("Single Shot Preview"):
single_shot_df = gr.DataFrame()
with gr.Accordion("Multi Hop Preview"):
multi_hop_df = gr.DataFrame()
with gr.Accordion("Lighteval Preview"):
lighteval_df = gr.DataFrame()
stages_table.change(
update_dataset,
inputs=[stages_table, hf_org_dropdown, hf_dataset_name],
outputs=[ingestion_df, summarization_df, single_shot_df, multi_hop_df, lighteval_df],
)
# TODO: this timer should only be active when the second tab is passed to active for the first time
log_timer = gr.Timer(1.0, active=True)
log_timer.tick(
MANAGERS.read_and_get_output,
inputs=session_state,
outputs=[log_output, stages_table],
)
with gr.Tab("Evaluate", id=2):
with gr.Row():
btn_launch_evals = gr.Button("Launch evaluations")
status = gr.Textbox(label="Status")
btn_launch_evals.click(run_evaluation_pipeline, [hf_org_dropdown, hf_dataset_name], status)
app.load(init_session, outputs=session_state)
app.launch(allowed_paths=[PATH])