Clémentine
add log to debug
aaaafe3
raw
history blame
10.1 kB
import asyncio
import os
import sys
import time
import gradio as gr
import uuid
from datasets import load_dataset
from huggingface_hub import whoami
from loguru import logger
from pathlib import Path
from yourbench_space.config import generate_and_save_config
from yourbench_space.utils import (
SubprocessManagerGroup,
save_files,
update_dataset,
STAGES,
)
from yourbench_space.evaluation import create_eval_file, run_evaluations
from yourbench_space.leaderboard_space.env import HF_TOKEN
project_description = """
# YourBench 🚀
**Dynamic Benchmark Generation for Language Models**
Quickly create zero-shot benchmarks from your documents – keeping models accurate and adaptable
- 📖 [FAQ](#)
- 💻 [GitHub](https://github.com/huggingface/yourbench/tree/v0.2-alpha-space)
"""
logger.remove()
logger.add(sys.stderr, level="INFO")
# Global to store all managers per session
MANAGERS = SubprocessManagerGroup()
docs_path = Path(__file__).parent / "docs.md"
citation_content = (
docs_path.read_text().split("# Citation")[-1].strip()
if docs_path.exists()
else "# Citation\n\nDocumentation file not found."
)
def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
manager = MANAGERS.get(session_state)
session_uid = session_state.value
config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path)
for _ in range(5):
time.sleep(0.5)
if config_path.exists():
return (
"✅ Config saved!",
gr.update(value=str(config_path), visible=True, interactive=True),
)
return (
"❌ Config generation failed.",
gr.update(visible=False, interactive=False),
)
final_dataset = None
def update_process_status(session_state: gr.State):
"""Update process status and include exit details if process has terminated"""
if session_state is None:
return gr.update(value=False, label="Not running")
manager = MANAGERS.get(session_state.value)
is_running = manager.is_running()
if not is_running:
exit_code, exit_reason = manager.get_exit_details()
status_text = f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}" if exit_reason else "Process Status: Stopped"
return gr.update(value=False, label=status_text)
return gr.update(value=True, label="Process Status: Running")
def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
new_env = os.environ.copy()
if oauth_token:
new_env["HF_TOKEN"] = oauth_token.token
new_env["DATASET_PREFIX"] = hf_dataset_name
MANAGERS.start_process(session_uid, custom_env=new_env)
def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None):
if oauth_token is None:
return gr.Dropdown([], label="Organization")
try:
user_info = whoami(oauth_token.token)
org_names = [org["name"] for org in user_info.get("orgs", [])]
user_name = user_info.get("name", "Unknown User")
org_names.insert(0, user_name)
return gr.Dropdown(org_names, value=user_name, label="Organization")
except Exception as e:
return gr.Dropdown([], label="Organization")
def switch_to_run_generation_tab():
return gr.Tabs(selected=1)
def enable_button(files):
return gr.update(interactive=bool(files))
def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name):
# Test dataset existence
eval_ds_name = f"{org_name}/{eval_name}"
# Test dataset existence
try:
load_dataset(eval_ds_name, streaming=True, token=oauth_token.token)
except Exception as e:
print(f"Error while loading the dataset: {e}")
return
# Run evaluations
create_eval_file(eval_ds_name)
status = asyncio.run(run_evaluations(eval_ds_name=eval_ds_name, org=org_name))
# Create space
from huggingface_hub import HfApi
repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}"
api = HfApi()
try:
api.create_repo(repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token)
api.upload_folder(repo_id=repo_id, repo_type="space", folder_path="src/", token=oauth_token.token)
api.add_space_secret(repo_id=repo_id, key="HF_TOKEN", value=oauth_token.token, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token)
api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token)
except Exception as e:
status = "Evaluation" + status + "\nLeaderboard creation:" + e
return status
def init_session():
"""Update session on load"""
local_uuid = str(uuid.uuid4())
MANAGERS.create(local_uuid)
logger.info(f"Started session for {local_uuid}")
return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
with gr.Blocks(theme=gr.themes.Default()) as app:
# We initialize the session state with the user randomly generated uuid
# Using uuid4 makes collision cases extremely unlikely even for concurrent users
session_state = gr.State()
gr.Markdown(project_description)
with gr.Tabs() as tabs:
with gr.Tab("Setup", id=0):
with gr.Row():
with gr.Accordion("Hugging Face Settings"):
login_btn = gr.LoginButton()
hf_org_dropdown = gr.Dropdown(
choices=[], label="Organization", allow_custom_value=True
)
app.load(
update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown
)
hf_dataset_name = gr.Textbox(
label="Dataset name",
value="yourbench",
info="Name of your new evaluation dataset",
)
with gr.Accordion("Upload documents"):
file_input = gr.File(
label="Upload text files",
file_count="multiple",
file_types=[".txt", ".md", ".html", ".pdf"],
)
output = gr.Textbox(label="Log")
file_input.upload(
lambda files: save_files(session_state.value, [file.name for file in files]),
file_input,
output,
)
preview_button = gr.Button("Generate New Config", interactive=False)
log_message = gr.Textbox(label="Log Message", visible=True)
download_button = gr.File(
label="Download Config", visible=False, interactive=False
)
file_input.change(enable_button, inputs=file_input, outputs=preview_button)
preview_button.click(
generate_and_return,
inputs=[hf_org_dropdown, hf_dataset_name, session_state],
outputs=[log_message, download_button],
)
preview_button.click(
switch_to_run_generation_tab,
inputs=None,
outputs=tabs,
)
with gr.Tab("Run Generation", id=1):
with gr.Row():
start_button = gr.Button("Start Task")
start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name])
stop_button = gr.Button("Stop Task")
stop_button.click(MANAGERS.stop_process, inputs=session_state)
kill_button = gr.Button("Kill Task")
kill_button.click(MANAGERS.kill_process, inputs=session_state)
with gr.Row():
with gr.Column():
with gr.Accordion("Log Output", open=True):
log_output = gr.Code(language=None, lines=20, interactive=False)
process_status = gr.Checkbox(label="Process Status", interactive=False)
status_timer = gr.Timer(2.0, active=True)
status_timer.tick(update_process_status, inputs=session_state, outputs=process_status)
with gr.Column():
with gr.Accordion("Stages", open=True):
stages_table = gr.CheckboxGroup(
choices=STAGES,
value=[],
label="Pipeline Stages Completed",
interactive=False,
)
with gr.Accordion("Ingestion"):
ingestion_df = gr.DataFrame()
with gr.Accordion("Summarization"):
summarization_df = gr.DataFrame()
with gr.Accordion("Single-Hop"):
single_hop = gr.DataFrame()
with gr.Accordion("Answer Generation"):
answers_df = gr.DataFrame()
stages_table.change(
update_dataset, inputs=[stages_table, hf_org_dropdown, hf_dataset_name], outputs=[ingestion_df, summarization_df, single_hop, answers_df]
)
# TODO: this timer should only be active when the second tab is passed to active for the first time
log_timer = gr.Timer(1.0, active=True)
log_timer.tick(
MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table]
)
with gr.Tab("Evaluate", id=2):
with gr.Row():
btn_launch_evals = gr.Button("Launch evaluations")
status = gr.Textbox(label="Status")
btn_launch_evals.click(run_evaluation_pipeline, [hf_org_dropdown, hf_dataset_name], status)
app.load(init_session, outputs=session_state)
app.launch(allowed_paths=["/app"])