advanced / app.py
Alina Lozovskaya
Auth [wip]
23510fc
raw
history blame
11.2 kB
import os
import sys
from huggingface_hub import HfApi
import requests
import pathlib
import subprocess
import shutil
import io
import yaml
import gradio as gr
import gradiologin as gl
from huggingface_hub import whoami
from loguru import logger
from yourbench.pipeline import run_pipeline
UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files")
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)
CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml")
logger.remove()
logger.add(sys.stderr, level="INFO")
class SubprocessManager:
def __init__(self, command):
self.command = command
self.process = None
self.output_stream = io.StringIO()
def start_process(self):
"""Start the subprocess."""
if self.is_running():
logger.info("Process is already running")
return
self.process = subprocess.Popen(
self.command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine stderr with stdout
text=True,
bufsize=1, # Line-buffered
start_new_session=True # Start the process in a new session
)
os.set_blocking(self.process.stdout.fileno(), False)
logger.info("Started the process")
def read_and_get_output(self):
"""Read available subprocess output and return the captured output."""
if self.process and self.process.stdout:
try:
while True:
line = self.process.stdout.readline()
if line:
self.output_stream.write(line) # Capture in StringIO
else:
break
except BlockingIOError:
pass
return self.output_stream.getvalue()
def stop_process(self):
"""Terminate the subprocess."""
if not self.is_running():
logger.info("Started the process")
return
logger.info("Sending SIGTERM to the Process")
self.process.terminate()
exit_code = self.process.wait() # Wait for process to terminate
logger.info(f"Process stopped exit code {exit_code}")
#return exit_code
def kill_process(self):
"""Forcefully kill the subprocess."""
if not self.is_running():
logger.info("Process is not running")
return
logger.info("Sending SIGKILL to the Process")
self.process.kill()
exit_code = self.process.wait() # Wait for process to be killed
logger.info(f"Process killed exit code {exit_code}")
#return exit_code
def is_running(self):
"""Check if the subprocess is still running."""
return self.process and self.process.poll() is None
command = ["uv", "run", "yourbench", f"--config={CONFIG_PATH}"]
manager = SubprocessManager(command)
def generate_config(hf_token: gr.OAuthToken | None, hf_org, model_name, provider, base_url, api_key, max_concurrent_requests):
config = {
"hf_configuration": {
"token": hf_token,
"private": True,
"hf_organization": hf_org
},
"model_list": [{
"model_name": model_name,
"provider": provider,
"base_url": base_url,
"api_key": api_key,
"max_concurrent_requests": max_concurrent_requests
}],
"model_roles": {role: [model_name] for role in [
"ingestion", "summarization", "single_shot_question_generation",
"multi_hop_question_generation", "answer_generation", "judge_answers"
]},
"inference_config": {"max_concurrent_requests": 16},
"pipeline": {
"ingestion": {
"source_documents_dir": "/app/uploaded_files",
"output_dir": "/app/ingested",
"run": True
},
"upload_ingest_to_hub": {
"source_documents_dir": "/app/ingested",
"hub_dataset_name": "test_ingested_documents",
"local_dataset_path": "/app/ingested_dataset",
"run": True
},
"summarization": {
"source_dataset_name": "test_ingested_documents",
"output_dataset_name": "test_summaries",
"local_dataset_path": "/results/test_summaries",
"concat_existing_dataset": False,
"run": True
},
"chunking": {
"source_dataset_name": "test_summaries",
"output_dataset_name": "test_chunked_documents",
"local_dataset_path": "/results/test_chunked_documents",
"concat_existing_dataset": False,
"chunking_configuration": {
"l_min_tokens": 64,
"l_max_tokens": 128,
"tau_threshold": 0.3,
"h_min": 2,
"h_max": 4
},
"run": True
},
"single_shot_question_generation": {
"source_dataset_name": "test_chunked_documents",
"output_dataset_name": "test_single_shot_questions",
"local_dataset_path": "/results/test_single_shot_questions",
"diversification_seed": "24 year old adult",
"concat_existing_dataset": False,
"run": True
},
"multi_hop_question_generation": {
"source_dataset_name": "test_chunked_documents",
"output_dataset_name": "test_multi_hop_questions",
"local_dataset_path": "/results/test_multi_hop_questions",
"concat_existing_dataset": False,
"run": True
},
"answer_generation": {
"run": True,
"question_dataset_name": "test_single_shot_questions",
"output_dataset_name": "test_answered_questions",
"local_dataset_path": "/results/test_answered_questions",
"concat_existing_dataset": False,
"strategies": [{
"name": "zeroshot",
"prompt": "ZEROSHOT_QA_USER_PROMPT",
"model_name": model_name
}, {
"name": "gold",
"prompt": "GOLD_QA_USER_PROMPT",
"model_name": model_name
}]
},
"judge_answers": {
"run": True,
"source_judge_dataset_name": "test_answered_questions",
"output_judged_dataset_name": "test_judged_comparisons",
"local_dataset_path": "/results/test_judged_comparisons",
"concat_existing_dataset": False,
"comparing_strategies": [["zeroshot", "gold"]],
"chunk_column_index": 0,
"random_seed": 42
}
}
}
return yaml.dump(config, default_flow_style=False)
def save_config(yaml_text):
with open(CONFIG_PATH, "w") as file:
file.write(yaml_text)
return "✅ Config saved!"
def save_files(files: list[str]):
saved_paths = [shutil.move(str(pathlib.Path(file)), str(UPLOAD_DIRECTORY / pathlib.Path(file).name)) for file in files]
return f"Files saved to: {', '.join(saved_paths)}"
def populate_user_info(oauth_profile: gr.OAuthProfile = None, oauth_token: gr.OAuthToken = None):
if oauth_profile is None or oauth_token is None:
return (
gr.Dropdown.update(choices=["(Please log in to load tokens)"], value=None),
gr.Dropdown.update(choices=["(Please log in)"], value=None),
"🔒 Not logged in"
)
username = oauth_profile.username
org_names = []
token_names = []
try:
headers = {"Authorization": f"Bearer {oauth_token.token}"}
hf_api = HfApi(token=oauth_token.token)
# Fetch all user tokens
token_data = hf_api.list_tokens()
for t in token_data:
name = t.get("name") or f"{t['token'][:4]}...{t['token'][-4:]}"
token_names.append(name)
# Fetch user organizations
orgs = hf_api.get_user_organizations()
org_names = [org.organization for org in orgs]
except Exception as e:
print("Error fetching user/org info:", e)
token_names = [f"{oauth_token.token[:4]}...{oauth_token.token[-4:]}"]
org_options = [username] + org_names
default_org = username
return (
gr.Dropdown.update(choices=token_names, value=token_names[0] if token_names else None),
gr.Dropdown.update(choices=org_options, value=default_org),
f"✅ Logged in as {username}"
)
def hello(profile: gr.OAuthProfile | None) -> str:
if profile is None:
return "I don't know you."
return f"Hello {profile.name}"
def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None) -> str:
if oauth_token is None:
print("Please deploy this on Spaces and log in to list organizations.")
return []
org_names = [org["name"] for org in whoami(oauth_token.token)["orgs"]]
return gr.Dropdown(org_names, label="Organization")
with gr.Blocks() as app:
gr.Markdown("## YourBench Configuration")
with gr.Row():
login_btn = gr.LoginButton()
hello_text = gr.Markdown()
app.load(hello, inputs=None, outputs=hello_text)
with gr.Tab("Configuration"):
model_name = gr.Textbox(label="Model Name")
hf_org_dropdown = gr.Dropdown(list(), label="Organization")
app.load(update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown)
provider = gr.Dropdown(["openrouter", "openai", "huggingface"], value="huggingface", label="Provider")
base_url = gr.Textbox(label="Base URL")
api_key = gr.Textbox(label="API Key")
max_concurrent_requests = gr.Dropdown([8, 16, 32], value=16, label="Max Concurrent Requests")
config_output = gr.Code(label="Generated Config", language="yaml")
preview_button = gr.Button("Generate Config")
save_button = gr.Button("Save Config")
preview_button.click(
generate_config,
inputs=[hf_org_dropdown, model_name, provider, base_url, api_key, max_concurrent_requests],
outputs=config_output
)
save_button.click(save_config, inputs=[config_output], outputs=[gr.Textbox(label="Save Status")])
with gr.Tab("Files"):
file_input = gr.File(label="Upload text files", file_count="multiple", file_types=[".txt", ".md", ".html"])
output = gr.Textbox(label="Log")
file_input.upload(save_files, file_input, output)
with gr.Tab("Run Generation"):
log_output = gr.Code(label="Log Output", language=None, lines=20, interactive=False)
start_button = gr.Button("Start Task")
start_button.click(manager.start_process)
timer = gr.Timer(0.1, active=True)
timer.tick(manager.read_and_get_output, outputs=log_output)
kill_button = gr.Button("Kill Task")
kill_button.click(manager.kill_process)
app.launch()