import os import sys from huggingface_hub import HfApi import requests import pathlib import subprocess import shutil import io import yaml import gradio as gr import gradiologin as gl from huggingface_hub import whoami from loguru import logger from yourbench.pipeline import run_pipeline UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files") UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True) CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml") logger.remove() logger.add(sys.stderr, level="INFO") class SubprocessManager: def __init__(self, command): self.command = command self.process = None self.output_stream = io.StringIO() def start_process(self): """Start the subprocess.""" if self.is_running(): logger.info("Process is already running") return self.process = subprocess.Popen( self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine stderr with stdout text=True, bufsize=1, # Line-buffered start_new_session=True # Start the process in a new session ) os.set_blocking(self.process.stdout.fileno(), False) logger.info("Started the process") def read_and_get_output(self): """Read available subprocess output and return the captured output.""" if self.process and self.process.stdout: try: while True: line = self.process.stdout.readline() if line: self.output_stream.write(line) # Capture in StringIO else: break except BlockingIOError: pass return self.output_stream.getvalue() def stop_process(self): """Terminate the subprocess.""" if not self.is_running(): logger.info("Started the process") return logger.info("Sending SIGTERM to the Process") self.process.terminate() exit_code = self.process.wait() # Wait for process to terminate logger.info(f"Process stopped exit code {exit_code}") #return exit_code def kill_process(self): """Forcefully kill the subprocess.""" if not self.is_running(): logger.info("Process is not running") return logger.info("Sending SIGKILL to the Process") self.process.kill() exit_code = self.process.wait() # Wait for process to be killed logger.info(f"Process killed exit code {exit_code}") #return exit_code def is_running(self): """Check if the subprocess is still running.""" return self.process and self.process.poll() is None command = ["uv", "run", "yourbench", f"--config={CONFIG_PATH}"] manager = SubprocessManager(command) def generate_config(hf_token: gr.OAuthToken | None, hf_org, model_name, provider, base_url, api_key, max_concurrent_requests): config = { "hf_configuration": { "token": hf_token, "private": True, "hf_organization": hf_org }, "model_list": [{ "model_name": model_name, "provider": provider, "base_url": base_url, "api_key": api_key, "max_concurrent_requests": max_concurrent_requests }], "model_roles": {role: [model_name] for role in [ "ingestion", "summarization", "single_shot_question_generation", "multi_hop_question_generation", "answer_generation", "judge_answers" ]}, "inference_config": {"max_concurrent_requests": 16}, "pipeline": { "ingestion": { "source_documents_dir": "/app/uploaded_files", "output_dir": "/app/ingested", "run": True }, "upload_ingest_to_hub": { "source_documents_dir": "/app/ingested", "hub_dataset_name": "test_ingested_documents", "local_dataset_path": "/app/ingested_dataset", "run": True }, "summarization": { "source_dataset_name": "test_ingested_documents", "output_dataset_name": "test_summaries", "local_dataset_path": "/results/test_summaries", "concat_existing_dataset": False, "run": True }, "chunking": { "source_dataset_name": "test_summaries", "output_dataset_name": "test_chunked_documents", "local_dataset_path": "/results/test_chunked_documents", "concat_existing_dataset": False, "chunking_configuration": { "l_min_tokens": 64, "l_max_tokens": 128, "tau_threshold": 0.3, "h_min": 2, "h_max": 4 }, "run": True }, "single_shot_question_generation": { "source_dataset_name": "test_chunked_documents", "output_dataset_name": "test_single_shot_questions", "local_dataset_path": "/results/test_single_shot_questions", "diversification_seed": "24 year old adult", "concat_existing_dataset": False, "run": True }, "multi_hop_question_generation": { "source_dataset_name": "test_chunked_documents", "output_dataset_name": "test_multi_hop_questions", "local_dataset_path": "/results/test_multi_hop_questions", "concat_existing_dataset": False, "run": True }, "answer_generation": { "run": True, "question_dataset_name": "test_single_shot_questions", "output_dataset_name": "test_answered_questions", "local_dataset_path": "/results/test_answered_questions", "concat_existing_dataset": False, "strategies": [{ "name": "zeroshot", "prompt": "ZEROSHOT_QA_USER_PROMPT", "model_name": model_name }, { "name": "gold", "prompt": "GOLD_QA_USER_PROMPT", "model_name": model_name }] }, "judge_answers": { "run": True, "source_judge_dataset_name": "test_answered_questions", "output_judged_dataset_name": "test_judged_comparisons", "local_dataset_path": "/results/test_judged_comparisons", "concat_existing_dataset": False, "comparing_strategies": [["zeroshot", "gold"]], "chunk_column_index": 0, "random_seed": 42 } } } return yaml.dump(config, default_flow_style=False) def save_config(yaml_text): with open(CONFIG_PATH, "w") as file: file.write(yaml_text) return "✅ Config saved!" def save_files(files: list[str]): saved_paths = [shutil.move(str(pathlib.Path(file)), str(UPLOAD_DIRECTORY / pathlib.Path(file).name)) for file in files] return f"Files saved to: {', '.join(saved_paths)}" def populate_user_info(oauth_profile: gr.OAuthProfile = None, oauth_token: gr.OAuthToken = None): if oauth_profile is None or oauth_token is None: return ( gr.Dropdown.update(choices=["(Please log in to load tokens)"], value=None), gr.Dropdown.update(choices=["(Please log in)"], value=None), "🔒 Not logged in" ) username = oauth_profile.username org_names = [] token_names = [] try: headers = {"Authorization": f"Bearer {oauth_token.token}"} hf_api = HfApi(token=oauth_token.token) # Fetch all user tokens token_data = hf_api.list_tokens() for t in token_data: name = t.get("name") or f"{t['token'][:4]}...{t['token'][-4:]}" token_names.append(name) # Fetch user organizations orgs = hf_api.get_user_organizations() org_names = [org.organization for org in orgs] except Exception as e: print("Error fetching user/org info:", e) token_names = [f"{oauth_token.token[:4]}...{oauth_token.token[-4:]}"] org_options = [username] + org_names default_org = username return ( gr.Dropdown.update(choices=token_names, value=token_names[0] if token_names else None), gr.Dropdown.update(choices=org_options, value=default_org), f"✅ Logged in as {username}" ) def hello(profile: gr.OAuthProfile | None) -> str: if profile is None: return "I don't know you." return f"Hello {profile.name}" def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None) -> str: if oauth_token is None: print("Please deploy this on Spaces and log in to list organizations.") return [] org_names = [org["name"] for org in whoami(oauth_token.token)["orgs"]] return gr.Dropdown(org_names, label="Organization") with gr.Blocks() as app: gr.Markdown("## YourBench Configuration") with gr.Row(): login_btn = gr.LoginButton() hello_text = gr.Markdown() app.load(hello, inputs=None, outputs=hello_text) with gr.Tab("Configuration"): model_name = gr.Textbox(label="Model Name") hf_org_dropdown = gr.Dropdown(list(), label="Organization") app.load(update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown) provider = gr.Dropdown(["openrouter", "openai", "huggingface"], value="huggingface", label="Provider") base_url = gr.Textbox(label="Base URL") api_key = gr.Textbox(label="API Key") max_concurrent_requests = gr.Dropdown([8, 16, 32], value=16, label="Max Concurrent Requests") config_output = gr.Code(label="Generated Config", language="yaml") preview_button = gr.Button("Generate Config") save_button = gr.Button("Save Config") preview_button.click( generate_config, inputs=[hf_org_dropdown, model_name, provider, base_url, api_key, max_concurrent_requests], outputs=config_output ) save_button.click(save_config, inputs=[config_output], outputs=[gr.Textbox(label="Save Status")]) with gr.Tab("Files"): file_input = gr.File(label="Upload text files", file_count="multiple", file_types=[".txt", ".md", ".html"]) output = gr.Textbox(label="Log") file_input.upload(save_files, file_input, output) with gr.Tab("Run Generation"): log_output = gr.Code(label="Log Output", language=None, lines=20, interactive=False) start_button = gr.Button("Start Task") start_button.click(manager.start_process) timer = gr.Timer(0.1, active=True) timer.tick(manager.read_and_get_output, outputs=log_output) kill_button = gr.Button("Kill Task") kill_button.click(manager.kill_process) app.launch()